MaxCompute基础与MaxCompute SQL优化

简介: 总论:大数据计算服务 ( MaxCompute,原名 ODPS ) 是一种快速、完全托管的 TB/PB 级数据仓库解决方案 。MaxCompute 向用户提供了完善的数据导入方案以及多种经典的分布式计算模型,能够更快速的解决用户海量数据计算问题,有效降低企业成本,并保障数据安全 。
总论:

大数据计算服务 ( MaxCompute,原名 ODPS ) 是一种快速、完全托管的 TB/PB 级数据仓库解决方案 。MaxCompute 向用户提供了完善的数据导入方案以及多种经典的分布式计算模型,能够更快速的解决用户海量数据计算问题,有效降低企业成本,并保障数据安全 。同时,大数据开发套件和 MaxCompute关系紧密,大数据开发套件为 MaxCompute 提供了一站式的数据同步,任务开发,数据工作流开发,数据管理和数据运维等功能,您可以参见 大数据开发套件简介 来对其进行深入了解 。

MaxCompute 主要服务于批量结构化数据的存储和计算,可以提供海量数据仓库的解决方案以及针对大数据的分析建模服务 。随着社会数据收集手段的不断丰富及完善,越来越多的行业数据被积累下来 。数据规模已经增长到了传统软件行业无法承载的海量数据(百 GB、TB、乃至 PB)级别 。

在分析海量数据场景下,由于单台服务器的处理能力限制,数据分析者通常采用分布式计算模式 。但分布式的计算模型对数据分析人员提出了较高的要求,且不易维护 。使用分布式模型,数据分析人员不仅需要了解业务需求,同时还需要熟悉底层计算模型 。MaxCompute 的目的是为用户提供一种便捷的分析处理海量数据的手段 。用户可以不必关心分布式计算细节,从而达到分析大数据的目的 。

   首先 MaxCompute 不同于普通的mysql,oracle这样的关系型数据库,它其实是一个综合性的数据服务平台,它并不能在毫秒级甚至秒级返回查询结果,一条odps命令的执行通常需要经过如下流程:

提交作业(简易流程):

  1. 提交一个SQL语句,发送 RESTful 请求给HTTP服务器
  2. HTTP 服务器做用户认证。认证通过后,请求就会以 Kuafu通信协议方式发送给 Worker。
  3. Worker判断该请求作业是否需要启动Fuxi Job。如果不需要,本地执行并返回结果。如果需要,则生成一个 instance, 发送给 Scheduler。
  4. Scheduler把instance信息注册到 OTS,将其状态置成 Running。Scheduler 把 instance 添加到 instance 队列。
  5. Worker把 Instance ID返回给客户端。

运行作业(简易流程):

  1. Scheduler会把instance拆成多个Task,并生成任务流DAG图。
  2. 把可运行的Task 放入到优先级队列TaskPool中。
  3. Scheduler 有一个后台线程定时对TaskPool 中的任务进行排序。Scheduler 有一个后台线程定时查询计算集群的资源状况。Executor在资源未满的情况下,轮询TaskPool,请求Task。Scheduler判断计算资源。若集群有资源,就将该Task发给Executor。
  4. Executor调用SQL Parse Planner,生成SQL Plan。Executor 将 SQL Plan 转换成计算层的 FuXi Job 描述文件。Executor 将该描述文件提交给计算层运行,并查询 Task 执行状态。Task 执行完成后,Executor更新 OTS 中的 Task信息,并汇报给Scheudler。
  5. Schduler 判断 instance 结束,更新 OTS 中 instance 信息,置为 Terminated。

查询状态:

客户端接收到返回的 Instance ID 后,可以通过 Instance ID 来查询作业状态:

  1. 客户端会发送另一个 REST 的请求,查询作业状态。
  2. HTTP 服务器根据配置信息做用户认证。用户认证通过后,把查询的请求发送给 Worker。
  3. Worker 根据 InstanceID 去 OTS 中查询该作业的执行状态。Worker 将查询到的执行状态返回给客户端。
    其实MaxCompute是一个透明的数据服务平台,用户不需要了解分布式数据处理的细节,就可以在client上比较方便的处理PB级别的数据了。所以,在了解了以上内容之后,对于 MaxCompute 只能在分钟级别返回结果就有一个比较清楚的理解了。
ps:以上这些内容在大数据开发套件中都是透明的。

MaxCompute SQL基础:


    
     MaxCompute SQL 与普通关系型数据库的SQL大体类似,不同在于MaxCompute不支持如事务、主键约束、索引等,可以看成标准SQL的子集。

    DDL:data define language

     MaxCompute  操作以表为基础,ddl中涉及到对表的一系列操作,包括create,drop,alter
    我们以大数据开发套件上的一张表为例:
CREATE TABLE IF NOT EXISTS xxxx
(
    aa     STRING COMMENT 'xxxx',
    bb     STRING COMMENT 'xxxx',
    cc     STRING COMMENT 'xxxx',
    dd     STRING COMMENT 'xxxx',
    ee     STRING COMMENT 'xxxx',
    ff     STRING COMMENT 'xxx',
    gg     BIGINT COMMENT 'xxx'
)
COMMENT 'xxxx'
PARTITIONED BY (dt     STRING COMMENT '')
LIFECYCLE 10;

在工作流中我们希望任务能够顺利的执行,所以不管是DDL和DML中我们都尽量希望语句返回成功(if not exist,overwrite)
comment包括对应字段的注释和对应表的注释,这些都可以alter
与传统的SQL不同,MaxCompute面向全域数据,所以即使是用create xxxx select  xxxx from xxx的方式也需要as加上列的名称。
分区字段注明,由于MaxCompute操作的数据量很大,通常来说分区字段需要特别关注
生命周期:非常方便的属性,便于用户释放存储空间,简化回收数据的流程,不需要传统的繁杂的空间维护。灵活运用LastDataModifiedTime与touch(修改为当前时间),关注分区表和非分区表的区别。
对于大表结构的复制,odps提供非常灵活的create like语句。
alter几乎可以对表的所有属性进行更改,包括列,注释,分区,分区属性,生命周期等等。


 

DML:data manipulation language


    常见的比如insert,select,join
   insert overwrite|into table tablename
[partition (partcol1=val1, partcol2=val2 ...)] select_statement from from_statement;

     静态分区,分区字段常量;动态分区,可以不指定值,适用select字句中的分区列值
     multi-insert 单次读入,多次写入,减少数据读取。 

     

   select [all | distinct] select_expr, select_expr, ...
        from table_reference
        [where where_condition]
        [group by col_list]
        [order by order_condition]
        [distribute by distribute_condition [sort by sort_condition] ]
        [limit number]

     与传统的SQL不同的是,distinct作用所有select字段

     编译过程group > select > order/sort/distribute,理解了编译顺序也就理解了各个字句间别名的使用规范。
     distribute by:对数据按照某几列的值做hash分片。
     sort by:局部排序,语句前必须加distribute by。实际上sort by是对distribute by的结果进行局部排序。
     从功能的理解可知:order by不和distribute by/sort by共用,同时group by也不和distribute by/sort by共用。
     
     join和传统sql的表现较为一致,odps支持left outer join,right outer join,full outer join,inner join。
     mapjoin hint:当大表和小表join的情况下利用mapjoin将用户指定的小表全部加载到内存中,从而加快join的执行速度,同时支持非等值连接,full join 不可用,连接的主表需为大表

内建函数:


    主要包括数学与统计函数,字符串操作函数,时间函数,窗口函数,聚合函数,转置函数等
    就不一一列举了,功能强大。
    

UDF:user defined function


   包括udf,udtf,udaf
   udf:用户自定义标量函数
   udtf:用户自定义表值函数(返回多个字段)
   udaf:用户自定义聚合函数
   UDF:
  package org.alidata.odps.udf.examples;
     import com.aliyun.odps.udf.UDF;
     public final class Lower extends UDF {
       public String evaluate(String s) {
         if (s == null) { return null; }
         return s.toLowerCase();
       }
     }

 继承UDF类,实现evaluate方法即可。evaluate方法可以有多个,满足多态特性。

 UDAF:
继承com.aliyun.odps.udf.Aggregator,主要实现iterate,merge和terminate三个接口,UDAF的主要逻辑依赖于这三个接口的实现。此外,还需要用户实现自定义的Writable buffer,因为UDAF的主要逻辑是将数据进行分片后遍历,处理完之后进行merge。

UDTF:
继承com.aliyun.odps.udf.UDTF类,主要实现process和forward两个接口,SQL中每一条记录都会对应调用一次process,process的参数为UDTF的输入参数。输入参数以Object[]的形式传入,输出结果通过调用forward函数输出。

UDF统一添加方法:
add jar xxx
create function xxx as packagename.classname using 'jarname'
    

PL:存储过程


与传统sql类型,区别在于变量引用时前面加$
DECLARE
 var_name var_type;
BEGIN
 可执行语句
END;

    其他命令:


     explain,show instance,merge smallfile,添加/移除/显示统计信息(add/remove/show statistc 统计值或者符合某个表达式的值)

MaxCompute SQL优化与大数据开发套件:

选表原则:

  • 选择满足需求的小表,比如汇总表。维表尽量选择全量表,事实表尽量选择增量表;
  • 选择产出早的表;
  • 选择可回滚的表,比如使用加购事件表代替加购全流程表;
  • 依赖的N个上游表,尽量保证上游产出时间要均匀,如果有差异,考虑换依赖表;

小表原则:

  • 行数小于100万的表认为是小表,这个时候使用mapjoin性能会提高很多;
  • 读取数据的时候要加上分区等过滤条件,大表变小表。常用过滤条件字段,做成动态分区,方便下游过滤;
  • 不得不读取N天大表的时候,使用unionall方式合并多天数据;

代码原则:

  • Join关联要尽可能是主键关联。关联字段类型要一致;
  • 多天汇总,先生成1天轻度汇总表,多天使用1天数据再汇总;
  • multiinsert,实现一次读取多次写入;
  • 使用系统UDF代替自己的写的UDF;

调度原则:

  • 依赖max_pt的,要排除当天依赖;
  • 上游是小时任务,使用max_pt要慎重;
  • 执行超过1个小时任务要关注;
大数据开发套件:

大数据开发套件提供了直观的数据操作入口,数据研发过程代码的编写,调试,优化,发布都可以在大数据开发套件中进行。
拿一个任务耗时过长作例子,看看在大数据开发套件上我们是怎么处理碰到的问题的。

一个task执行时间过长,除掉本身代码的性能问题,那么有两种比较大的可能:
一种是等待问题,一种是数据倾斜问题
等待问题可能是由于系统资源不足,系统繁忙,优先级不够,数据量太大,碰到了坏盘等原因导致的
我们可以通过调整优先级,重跑,过滤初始数据等方法来处理。

倾斜问题则一般是数据本身的问题,常见的数据倾斜是怎么造成的?

Shuffle的时候,将各个节点上相同的key拉取到某个节点的一个task进行处理,比如按照key进行聚合或join等操作,如果某个key对应的数据量特别大的话,就会发生数据倾斜现象。数据倾斜就成为了整个task运行时间的短板。

触发shuffle的常见算子:distinct、groupBy、join等。

要解决数据倾斜的问题,首先要定位数据倾斜发生在什么地方,首先是哪个stage,直接在D2 UI上看就可以,查看数据是否倾斜了
logview--odps task--detail--stage--longtail

根据stage日志,判断出数据倾斜发生在哪个算子上。

根据倾斜发生的阶段,我们又可以把它们分为map倾斜,reduce倾斜,join倾斜

通常来说,对于倾斜现象,我们首先查看导致数据倾斜的key的数据分布情况,接下来大概有几种处理方案:

1:过滤数据
过滤掉某些脏数据,比如说是否可以去掉null,去掉某些条件对应的值
2:加大并行度
给任务添加处理资源,加大instance的数量,暴力
3:对数据进行拆分,分而治之
如果大表join小表,我们可以用mapjoin,将小表cache进内存
二次分发,加上随机前缀(数据膨胀),拆分数据集为热点+非热点再进一步处理
大表join超大表,还可以考虑bloomfilter
4:组合使用
上述方法,组合使用
5:修改业务
实在没有进步空间,从业务上过滤数据

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
目录
相关文章
|
3天前
|
SQL 分布式计算 关系型数据库
实时数仓 Hologres产品使用合集之湖仓加速版查询maxcompute外部表,有什么优化途径吗
实时数仓Hologres的基本概念和特点:1.一站式实时数仓引擎:Hologres集成了数据仓库、在线分析处理(OLAP)和在线服务(Serving)能力于一体,适合实时数据分析和决策支持场景。2.兼容PostgreSQL协议:Hologres支持标准SQL(兼容PostgreSQL协议和语法),使得迁移和集成变得简单。3.海量数据处理能力:能够处理PB级数据的多维分析和即席查询,支持高并发低延迟查询。4.实时性:支持数据的实时写入、实时更新和实时分析,满足对数据新鲜度要求高的业务场景。5.与大数据生态集成:与MaxCompute、Flink、DataWorks等阿里云产品深度融合,提供离在线
|
2天前
|
分布式计算 DataWorks 大数据
MaxCompute产品使用合集之odps.sql.mapper.split.size和odps.stage.mapper.split.size这两个参数的区别是什么
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
2天前
|
SQL 分布式计算 大数据
MaxCompute产品使用合集之如何在本地IDE(如IntelliJ IDEA)中配置MaxCompute (mc) 的任务和调试SQL
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
2天前
|
SQL 机器学习/深度学习 分布式计算
MaxCompute产品使用合集之sql代码中支持插入jinja语法语句吗
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
2天前
|
存储 分布式计算 大数据
MaxCompute产品使用合集之在sql里调用自定义的udf时,设置一次同时处理的数据行数,是并行执行还是串行执行的
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
2天前
|
SQL 分布式计算 DataWorks
DataWorks操作报错合集之在执行SQL时,出现了权限问题。错误代码为odps-0433121,是什么导致的
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
4天前
|
分布式计算 DataWorks 关系型数据库
MaxCompute产品使用合集之DataWorks是否支持通过SQL方式在MaxCompute中查询数据,并通过数据集成服务将查询结果同步至MySQL数据库
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
2天前
|
SQL DataWorks NoSQL
DataWorks产品使用合集之如何将SQL Server中的数据转存到MongoDB
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
26天前
|
SQL API 流计算
实时计算 Flink版产品使用合集之在Mac M1下的Docker环境中开启SQL Server代理的操作步骤是什么
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。

热门文章

最新文章

相关产品

  • 云原生大数据计算服务 MaxCompute