一条SQL在 MaxCompute 分布式系统中的旅程

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 2019杭州云栖大会大数据技术专场,由阿里云资深技术专家侯震宇、阿里云高级技术专家陈颖达以及阿里云资深技术专家戴谢宁共同以“SQL在MaxCompute分布式系统中的旅程 ”为题进行了演讲。本文首先介绍了MaxCompute计算平台及其特点、超大规模企业级SQL引擎和其功能。然后讲解了如何构建企业级分布式智能调度执行框架。最后介绍了新一代列式存储引擎AliOrc及优化方式。

摘要:2019杭州云栖大会大数据技术专场,由阿里云资深技术专家侯震宇、阿里云高级技术专家陈颖达以及阿里云资深技术专家戴谢宁共同以“SQL在 MaxCompute 分布式系统中的旅程 ”为题进行了演讲。本文首先介绍了 MaxCompute 计算平台及其特点、超大规模企业级SQL引擎和其功能。然后讲解了如何构建企业级分布式智能调度执行框架。最后介绍了新一代列式存储引擎AliOrc及优化方式。

视频直播回放 >>>
以下为精彩视频内容整理:


MaxCompute–面向企业的超大规模计算

  • 全托管,多租户,超大规模平台

    MaxCompute拥有庞大的用户群体,支持阿里集团的各个关键业务和复杂场景,支持多个互联网新兴企业核心业务,以及支持关系国计民生、国家安全的关键行业。同时拥有超大规模计算存储,包括单日千万以上计算任务、多EB级别存储量、10万台以上服务器以及全球超过十个数据中心。
  • 企业级高性能计算引擎

    TPC-BigBench是更面向于大数据运算的BigBench,覆盖一些复杂类型,包括机器学习场景,更贴近于大数据场景的业务。在2017年,阿里的TPC-BigBench首个通过100TB 验证的引擎。在2018年,TPC-BigBench 首个达到18000+引擎。在2019年,进一步提升到 25000+,正式公布到TPC官网。

MaxCompute不仅仅在阿里集团内部被广泛的使用,也支撑着许多著名的互联网方面的厂商,以及关系到国计民生、国家安全方面的应用。

image.png

超大规模企业级SQL引擎–MaxCompute UniSQL

一条SQL在分布式系统中的旅程

image.png

上图所示为运行SQL任务中的大概流程。首先使用一条SQL语句,通过Compile,可以生成一个逻辑的执行计划,这个逻辑的执行计划是计算机能够理解的,再经过Optimize过程,无论逻辑计划有多复杂,都要翻译成针对目前集群和运行时刻的Runtime最优的物理执行计划,对于每一个Optimize不一定与原始的SQL相关。然后经过计算调度框架,使得合理快速的安排执行任务。由调度框架做的安排应用到每台机器之后,每台机器都会有一个SQL的运行时(Runtime Engine),它是真正能够理解物理执行计划的,并且一步一步把数据从Storage中读出来,再经过Shuffle得到结果,最后返回到Storage中。可以看出,运行时本身的性能是非常关键的,仅仅一条SQL语句有可能消耗几百T的data,这时,Storage的性能也是至关重要的。

SQL的功能

  • Not Only SQL – 脚本模式

    image.png

上图为SQL的一个脚本,上面是配置语句,下面是创建的表,每句都是SQL的语句,但是这些SQL语句都可以串在一个脚本里,当表述一个非常复杂的逻辑时,不需要把脚本写成嵌套的形式,这种方式更灵活,能够支撑更复杂的业务场景。阿里内部有非常复杂的业务场景,过去不支持这种方式时,用户是使用嵌套的方式,使脚本变得复杂和扭曲,并且有大量的重复,以致不能解决时,就会将其拆分,再通过外部调动的方式串连起来。因为人如果承受不了维护的代价,就要引用额外性能的开销,后面的语句就要引用前面的语句。不管脚本有多复杂,经过编译器之后,还是一个单一完整的执行计划,并不会带来额外的开销。优化器看到的上下文越多,优化的机会越多,形成单一完整的执行计划之后,就可以以最高效的方式执行整个业务模式。此外,DataWorks也是支持这种模式的。通过脚本模式可以效仿C++或者Java来写SQL。

  • Not Only SQL– 参数化视图

    image.png

例如,在写C++或者Java时,经常在公共的逻辑中抽取出来一个函数,把公共的逻辑放到某一个模块里,这个过程视为代码的重用机制。但是,标准的SQL,尤其是大数据的SQL是缺乏这种机制的。对于阿里这么复杂的场景,这种需求是很迫切的。底层的数据集提供了各个部门都需要的基础的数据,不同的业务部门可能都要消耗这部分数据,消耗的方式是不同的。这时,也想像C++或者Java那样抽取一个函数,在MaxCompute中是可以实现的。在MaxCompute里,上图中的红框除了作为普通的view,也可以封裝一些SQL复杂的逻辑和对数据的读取,可以把表的变量传入,这时就可以实现像C++或者Java中函数的功能,可以把SQL里公共的业务逻辑封装在一起,同时结合上文的脚本模式,参数化视图就可以组织非常复杂的SQL的业务逻辑用来支撑非常复杂的业务场景。

  • Not Only SQL – IF/ELSE

    image.png

一般的大数据不支持IF/ELSE,但对于IF/ELSE是有需求的。例如,每周做一次全量的计算,但每天只做增量的计算,如果没有IF/ELSE的支持,就需要把脚本拆成两个,通过调度的框架串连起来。但是,在MaxCompute中结合脚本模式,可以直接的写入IF语句或SELECT语句,如果返回的是异常的结果,直接可以放在一个表达式里,决定执行SQL的分支。所有的SQL的功能都是针对复杂的应用场景的需要。

  • Not Only SQL – UDT & TRANSFROM

    image.png

普通的SQL都会有基本的数据类型,有时也有复杂类型,但都是属于给定范围的数据类型。当数据类型特别复杂时,在MaxCompute里可以直接使用。右侧框架是将Java和SQL无缝的融合在一起,无需UDF封装。左侧为SELECT TRANSFORM,是直接就可以在SQL里调用shell脚本,并且完全兼容Hive。

SQL的性能

  • SQL Engine for Huge Data - Adaptive Join

    image.png

Adaptive Join包括Hash Join和Merge Join。Hash Join的性能是比较好的,但有时碰到不合适的场景时,特别是有非常多的Hash冲突时,性能就会变得很差。Merge Join的特点是能够提供一个性能的下限。可以通过动态的选择适合哪种场景,以便做智能的选取。

  • SQL Engine for Huge Data – Advanced Shuffle

    image.png

Shuffle也有针对特定大规模系统的优化,包括提升Shuffle 70%的性能,提升大规模共享集群性能,提升稳定性,降低IO压力。具体包括以下优化方式:

1、Greysort模式(Mapper不排序,Reducer排序),增加与下游流水线机会;下游转化为HashJoin时消除排序
2、Encoding & Adaptive列式压缩,降低IO与Cache Miss
3、优化内存结构,降低Working Set Size并消除Pointer Chasing

企业级分布式智能调度执行框架

打造企业级分布式调度执行系统

image.png

整个系统的发展有两个维度,一个维度是系统的规模,随着系统规模的不断成长,对于分布式调度执行系统要面对每天千万级需要解决的问题,在阿里这个大体量的数据下,单个分布式作业规模已经能达到数十万个计算节点,已经有上百亿连接和运行数万台的物理机。

另一个维度是系统的成熟度,一个系统成为企业级的分布式执行调度系统就需要达到成熟度,包括三个阶段,第一个阶段是可用性(正确性),一个作业在单机系统上执行的结果和分布式系统上执行的结果是不一样的,尤其是在系统的超大规模上,在面对系统各种各样的节点失败问题、网络层的失败问题和各种容灾问题时,怎样通过正确的方式能保证作业正确的产出是很重要的。第二阶段是够用,是指每一个计算的系统都要锻造自己的性能,能在各种各样的benchmark上标准结果,通过此方法来提升性能。第三个阶段是好用(智能化),是指在动态执行过程中拥有动态能力和自适应能力,可以根据作业的不同特点来调整作业执行的计划。

企业级分布式计算调度框架

企业级分布式计算调度框架分为三个阶段:

  • 动态的智能执行

    image.png

上图所示为阿里的一个作业在离开优化器以后,在分布式系统里执行的过程。可以理解为从逻辑图到物理图映射的过程。

image.png

上图所示为三个阶段的作业,第一个阶段是作业提交开始运行,第二个阶段是根据实际产出动态调整并发,第三个阶段是产生所需数据提前结束作业。

image.png

上图所示为智能化DAG执行的动态逻辑图,包括Sorted Merge Join和Broadcast Join两种算法。其中Sorted Merge Join的特点包括经典分布式join算法,可支持大规模作业,可用范围广(slow but reliable),代价较昂贵 (full shuffle + sort),且shuffle可能带来数据倾斜。Broadcast Join的特点是只适用特定类型作业 (一路输入可载入单计算节点内存),非适用场景上可能导致OOM,作业失败。

image.png

对动态的选择执行计划,在理想情况下都希望数据的分布是均匀的,并且可以理解数据的特性,所以优化器都可以做出“最佳”的计划,尤其是在做benchmark时,但是由于源数据统计不准确 、中间数据特性波动 ,所产生数据的特点是没有办法提前预估的,所以允许优化器来给一个非确定的执行计划(Conditional Join),这时,优化器会给出两个执行路径的计划,调度执行框架可以根据上游实际产生的数据量,动态的调整逻辑图的执行。

image.png

上图所示为并发度的例子。简单的并发调整是根据上游总数据量直接取平均作为并发,仅支持向下调整,但问题是数据可能是倾斜的,这种方法已不再适用。下面给出两种新的调度方法:

1、依据分区数据统计调整:避免并发调整加重数据倾斜,可向上向下调整。
2、分区统计基础上,自动切分大分区:双重调整,消除分区内的数据倾斜,并支持数据处理归并,以保留分区特性。

  • 高效作业管理

    image.png

对于阿里如此大规模的作业,调度的敏捷度是十分重要的,因为集群规模很大,一个作业怎样理解各个计算节点和物理机的状态,做智能的容错和预判性的容错是阿里所做的一项工作。随着作业规模越来越大,一个非常优秀的调度框架能带来的性能提升会越来越明显。

  • 多种计算模型融合

    image.png

阿里整个计算平台作为飞天的底座,不仅仅运行SQL,也有可能运行其他。最经典的SQL是batch执行。离线和一体式的执行是资源利用率和性能优化的两个极端,作为一个用户,会同时关注执行性能和资源利用率,需要思考的问题是,怎样在两个点中达到平衡。因此,阿里也支持一种称为bubble的调度,所谓bubble调度是允许一个作业的子图同时调度,下游的子图分布调度,在不同的SQL上会有不同的效果。例如。在TPCH11的情况下,相对于离线(batch)会有66%的性能提升,相对于一体式(all-in-one)会节省3倍的资源,同时获取95%的性能。

新一代列式存储引擎AliOrc

在AliOrc的里程中,起点和终点都是在存储层,数据的读和写是AliOrc执行的开始和结束,存储引擎作为AliOrc的底座,承担着一个非常重要的作用。

基于Apache Orc的深度优化

image.png

整个计算引擎是基于列结构的,技术的出发点是Apache Orc。在此基础上,阿里做了很多深度的优化,包括I/O维度、内存优化、索引和数据编码压缩。其中有一部分已经贡献到了社区。

新一代列式存储引擎

新一代列式存储引擎包括以下技术方面:

  • 并行化编码技术

    image.png

对于有一系列的大数和小数,直接存放时会产生4个字节,而对于小数,前面会产生很多的零,这些零是没有意义的。并行化编码技术的主要思想就是将冗余的信息删掉,将真正有意义的batch留下,并且pack到一起。这种编码方式的好处是能实现并行化。此外,还进行了一些扩展,包括对有序数据的优化,以及对数据的编码优化。同时重新设计了编码存储格式,更利于内存对齐,以及列存储。

image.png

从测试的结果来看,此编码技术比传统游程编码速度快4到6倍,压缩率提升大概10%左右,在反应到TPC Benchmark表扫描效率提升24%。之所以有如此快的结果,是因为使用AVX256一条指令可以处理8个64位数,或者16个32位数,同时充分利用函数模板展开,最大程度避免循环和分支预测失败。

  • 异步并行IO

    image.png

阿里是属于列存储引擎的,是指在同一个列是放在一起的,好处是在读数据时选择几个列放到存储引擎中去读,就不需要读所有的列。假设在上图中的场景中,有三个列为A、B、C。最早的IO模型是串行的,存在许多等待时间。因此,阿里做了一个改进为Prefetch模型,IO是不需要一个一个发出去的,在一开始时可以将三个读取引擎一起发出去,但是需要一个一个的等待它们回来,虽然有了一些提升,但是还仍然存在IO等待的时间。目前为止,改进的模型为Prefetch+Async Paraller IO,是将IO全部并行化,将三个一起发出去之后,并不需要按照原来A、B、C的顺序等待,可以按照回来的顺序做解压和解码。这样做可以对IO等待的时间降到最小。

image.png

如上图所示,异步并行IO与同步读取相比较,IO等待时间减少97%,端到端时间减少45%。

  • 延迟读取、延迟解码

    为了进一步的提高性能,减少数据读取量,从而减少数据解码、解压缩成为了关键。

image.png

如上图所示为延迟读取的一个例子,通过只读取DEPT列,把ADDRESS以及SALARY列延迟到过滤之后读取,可以大幅减少了不必要的数据读取。

image.png

对于字符串类型的列,有一种方法叫字典编码,是指将字符串里不一样的Key找出并且给予ID,这时,数据在存放时是不需要存放整个字符串的,只需要存放ID就可以。但是使用此方法是很耗时的。由此,做了以下改进:

image.png

使用延迟解码,跳过解码步骤,直接在字典上匹配,再以ID到数据列搜索。好处是减少了字符串匹配次数以及减少了字典解码时间。

image.png

如上图所示,对打开延迟读写和没有打开延迟读写做了比较,横坐标为filter过滤的数据,“1”表示没有过滤,纵轴是花费的时间,实现延迟读取之后,读取数据量随Selectivity的提升而减少,读取时间也相应大幅降低。


了解更多 MaxCompute 技术与产品详情,欢迎加入“MaxCompute开发者社区”,扫码或点击链接均可加入 https://h5.dingtalk.com/invite-page/index.html?bizSource=____source____&corpId=dingb682fb31ec15e09f35c2f4657eb6378f&inviterUid=E3F28CD2308408A8&encodeDeptId=0054DC2B53AFE745
image.png

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
目录
相关文章
|
1月前
|
SQL 存储 测试技术
SQL在构建系统中的应用:关键步骤与技巧
在构建基于数据库的应用系统时,SQL(Structured Query Language)作为与数据库交互的核心语言,扮演着至关重要的角色
|
7天前
|
SQL 存储 算法
比 SQL 快出数量级的大数据计算技术
SQL 是大数据计算中最常用的工具,但在实际应用中,SQL 经常跑得很慢,浪费大量硬件资源。例如,某银行的反洗钱计算在 11 节点的 Vertica 集群上跑了 1.5 小时,而用 SPL 重写后,单机只需 26 秒。类似地,电商漏斗运算和时空碰撞任务在使用 SPL 后,性能也大幅提升。这是因为 SQL 无法写出低复杂度的算法,而 SPL 提供了更强大的数据类型和基础运算,能够实现高效计算。
|
16天前
|
分布式计算 Java 开发工具
阿里云MaxCompute-XGBoost on Spark 极限梯度提升算法的分布式训练与模型持久化oss的实现与代码浅析
本文介绍了XGBoost在MaxCompute+OSS架构下模型持久化遇到的问题及其解决方案。首先简要介绍了XGBoost的特点和应用场景,随后详细描述了客户在将XGBoost on Spark任务从HDFS迁移到OSS时遇到的异常情况。通过分析异常堆栈和源代码,发现使用的`nativeBooster.saveModel`方法不支持OSS路径,而使用`write.overwrite().save`方法则能成功保存模型。最后提供了完整的Scala代码示例、Maven配置和提交命令,帮助用户顺利迁移模型存储路径。
|
1月前
|
缓存 NoSQL Java
大数据-50 Redis 分布式锁 乐观锁 Watch SETNX Lua Redisson分布式锁 Java实现分布式锁
大数据-50 Redis 分布式锁 乐观锁 Watch SETNX Lua Redisson分布式锁 Java实现分布式锁
59 3
大数据-50 Redis 分布式锁 乐观锁 Watch SETNX Lua Redisson分布式锁 Java实现分布式锁
|
1月前
|
SQL 存储 数据库
SQL在构建系统中的应用:关键要素与编写技巧
在构建基于数据库的系统时,SQL(Structured Query Language)扮演着至关重要的角色
|
1月前
|
存储 缓存 NoSQL
大数据-38 Redis 高并发下的分布式缓存 Redis简介 缓存场景 读写模式 旁路模式 穿透模式 缓存模式 基本概念等
大数据-38 Redis 高并发下的分布式缓存 Redis简介 缓存场景 读写模式 旁路模式 穿透模式 缓存模式 基本概念等
60 4
|
1月前
|
缓存 NoSQL Ubuntu
大数据-39 Redis 高并发分布式缓存 Ubuntu源码编译安装 云服务器 启动并测试 redis-server redis-cli
大数据-39 Redis 高并发分布式缓存 Ubuntu源码编译安装 云服务器 启动并测试 redis-server redis-cli
55 3
|
2月前
|
数据采集 分布式计算 MaxCompute
MaxCompute 分布式计算框架 MaxFrame 服务正式商业化公告
MaxCompute 分布式计算框架 MaxFrame 服务于北京时间2024年09月27日正式商业化!
83 3
|
1月前
|
SQL 数据库连接 数据库
管理系统中的Visual Studio与SQL集成技巧与方法
在现代软件开发和管理系统中,Visual Studio(VS)作为强大的集成开发环境(IDE),与SQL数据库的紧密集成是构建高效、可靠应用程序的关键
|
1月前
|
SQL 消息中间件 分布式计算
大数据-143 - ClickHouse 集群 SQL 超详细实践记录!(一)
大数据-143 - ClickHouse 集群 SQL 超详细实践记录!(一)
71 0

相关产品

  • 云原生大数据计算服务 MaxCompute