主要分成四个部分。首先介绍阿里云自研的离线实时数仓,一体化的数仓,主要是MaxCompute和Hologres产品的各自核心的定位。第二部分是MaxCompute近实时方面的主要的能力升级的核心,第三部分是MaxCompute近实时关键升级部分的主要核心技术,最后一部分是近实时的展望。
一、阿里云自研离线实时一体化数仓
MaxCompute和Hologres是阿里大数据资源的两个全能产品,两个产品是有明确的核心定位,实际在一体化方面做很多优化来提高用户的使用体验。支持MaxCompute和Hologres做直读直写,同时也支持SQL做湖操作。但是两个产品是互补的关系。
首先从使用场景,MaxCompute的重点是在ETL的使用场景。作业基本上是小时级或者是天级别的,Hologres重点是AP分析和Serverless的场景,用户对实质性要求很高,比如毫秒级就要返回结果,另外在规模方面,MaxCompute是EB级的数仓,每天处理的数据是能到EB级,Hologres主要是PB级的数仓。从弹性方面,MaxCompute是可以理解成无限弹性,是可以处理整个互联网级的数据量,Hologres主要是以独享的资源形态为主,也推出Serverless,提高性价比。从语言特点的方面,MaxCompute主要是兼容Hive和Spark大数据生态的SQL语言。语言的强项是能做很复杂的Join关联。同时可以给用户很自由的UDF。在UDF中可以实现很多用户自定义的能力,Hologres主要是PG的生态,提供很多PG,比如地理函数方面的一些高级功能。
下面介绍离线和实时引擎方面,在业界流行的一些技术不局限于MaxCompute和Hologres,整个离线和实时的数仓,在使用的技术有大概的介绍,从离线和实时两个技术,优化的目标是不同的,离线的数仓主要是关注Throughput,实时的数仓可能更关注Latency,因为优化目标不同,所以使用的基础也有很大的差异,首先从调度方面,离线数仓更多的依赖scheduling的调度,是离线和实时在调度方面最明显的区别,实时数仓更多的用MPP,调度可能是all at once的,所有的Stage都是一起来调度的。从Runtime角度,离线数仓可能更依赖Codegen的技术,Codegen在很多的场景,比如很多列的做sort场景,如果有Codegen性能就会特别好,但是Codegen本身可能会比较耗时,比如复杂的Sort、光Codegen的时间可能要上百毫秒,时间对于实时的数仓不可以接受,所以实时数仓更多的是采用向量化的技术加速它的执行,另外离线的数仓在Runtime的角度,基本上都会使用pull的Model,就是vocadu Model,实时的技术更多的是采用Pipeline,是push的Model会更好的利用机器的多核的计算的能力。
在Shuffle的角度,离线技术,基本上都会采用Shuffle的Sevice,保证它的Shuffle的稳定性,并且做很细力度的Failover,实时基本上会采用In memory Shuffle方式,所有的Shuffle的数据都会是通过网络直连的方式发送。在实时的Runtime的执行过程中遇到OM,大多时候没有细粒度的Failover,可能需要用户直接去Runtime,从存储的角度,离线数仓更关注分层的存储,做到极致的性价比。实时数仓一般会用更多的内存或者SID存储的数据,以提高时延。离线和实时数仓技术都依赖于引擎的通用的能力,比如更好地利用Cost face和History历史的信息做执行计划的优化,同时要更快收集stash的能力。另外开源引擎还是自研的引擎,最近几年都转向native Runtime。很少在利用,比如java Runtime,无论是实时还是离线数仓都在强调在adaptive执行的能力适应不同的数据规模,离线和实时的技术也正在加速融合,比如调度是stage和MPP的调度,或者离线的数仓也有一些all at once像MPP的调度的方式,MPP的引擎为了支持更大的数据处理的规模,也会使用Stage调度,融合的第二个维度,在资源很受限的情况下,作业的执行的部分的Stage也是可以应用all at once的调度,一般在业界称为是bubble调度,Codegen和向量化也是一直在融合的两个技术。比如Codegen的时间耗时比较长,很多引擎可以Codegen和向量化执行是在同步进行的作业,开始执行的时候是使用向量化的方式,再异步的做Codegen,Codegen做好最优的执行计划之后,会再切换成Codegen的执行方式。
二、MaxCompute近实时介绍
下面介绍MaxCompute在近实时方面升级的核心。首先推出全新的Delta Table形态,Delta Table完全是MaxCompute自研的Table Format,同时支持upsert,也支持CDC的异步生成。会自动的管理全量和增量的数据,支持是分钟级的数据导入,Delta Table管理全量和增量数据能力之后,会应用增量数据做增量的计算。支持增量的查询,会有Stream Object新的抽象。同时增量MV也支持用户调整自己的刷新频率更好的平衡Latency和Throughput,最后一部分是MCQ 2.0,MCQ 1.0使用的场景比较受限,是支持PINM屏显的作业,MCQ 1.0之后,升级成MCQ 2.0里面是可以使用SQL的完整的功能,同时依赖强的Virtual warehouse的隔离环境,MCQ 2.0跑的会比1.0更稳定。通过一些全链路的Cache优化和异步的优化,MCQ 2.0可以做到秒级查询的延时,期望MC从离线数仓走近实时数仓之后,统一SQL引擎支持所有的用户的查询,同时也给用户更高性价比的选择。
下面介绍MC近实时的架构的升级,适应近实时的能力,最底层是依赖于Pangu的统一存储底座,但为了支持Delta Table,在开放存储StorageAPI和Table SDK Common Table方面都做了很多工作。也通过Manifest的管理做好全量和增量数据的映射,在弹性计算层,它是数据导入的入口,支持Delta Table的数据导入,Storage Service是改动很大的模块。之前Storage Service的定位是做小文件合并和分层存储冷数据的归档,为支持更好的管理增量的数据,Storage Service做了很多自动的优化,比如为提高查询的性能做auto的sort,把小文件合并,避免小文件影响读取的性能,也会异步的构建CDC,为了后续的增量查询,做增量数据的输入,Meta Service是管理所有的Delta Table的增量数据的版本,同时也要保证在不同的读写之间的Transaction的ACID的一致性保证,MC的SQL引擎也改动很多。比如要支持增量查询,SQL语法方面要做很多扩展,同时Optimizer要支持基于统一的Costbase的框架做MV的增量的刷新,增量刷新是有不同的算法,比如可以基于State的增量的算法,也可以支持Table snap shot增量算法,不同的物理执行计划的选择由Cost Space Model的统一框架支持。最上面是查询加速层,MaxComputeq 2.0,依赖Fuxi的Data Cache做一些全链路的优化,比如Codegen的Cache的优化,比如数据不同层次的meta的Cache、optimizer也针对于latency做很多优化,选择更倾向于用Hash的算法支持Query查询。最后Runtime也会做CMD优化,保证source关键的算子有更高的执行效率。
三、MaxCompute近实时关键技术
这部分介绍近实时升级后技术的细节。首先从右边的图可以看到Delta Table的数据管理,Compacted Table对应全量的数据,增量数据包含Delta 的数据和CDC数据。从SQL语法上,Delta Table要求用户在create Table指定pk列,同时tbleproperties也要指定transactional=true,不同的数据增量和全量的数据是由Storage Service后台做不同的转换,具体到每一行的数据,支持upsert数据的插入,upsert的语义是全量数据没有就是insert into的语义,如果是全量数据,它就会按pk列做去重,支持部分列的更新,从右下角可以看到为了读取性能,在pk上会分成很多个Bucket,不同的Bucket里面包括全量数据和增量数据是由统一的Manifest管理映射关系,Bucket是在Table原有的partition的力度下,是正交的功能,存储服务架构做很多的调整支撑增量数据,支持分钟级流之后,增量数据是很细小的数据流入到MC的存储中,所以有很多小文件对读取性能有很大的影响。最开始为加速写入的速度,写入的数据是用error的格式行存的格式存储的数据,数据会自动auto sort变成列存的数据。
有比较多的列存的数据,auto merge将小文件做合并,数据如果超过time travel的周期后,做compact消除掉多版本的数据,更大的节省存储的空间。Delta Table依赖于原数据做很好的transaction的支持,保证表的ACID的特性。所有的数据操作,无论parner的数据导入,还是SQL的数据的insert操作,又或者后台数据治理auto compact ,auto merge都是依赖于meta服务,收掉所有的meta的操作保证transaction。meta服务会有mata plan抽象所有的对于数据的读写的操作。同时meta服务是依赖于ots做数据的存储,相对于完全用文件,比如业界的湖上的Table Format,完全用文件表示所有的版本增量和全量数据的映射关系。依赖于ots可以做到更好的读取性能。Meta服务还服务于SQL的增量查询的能力,比如time travel,我们会要找到特定时刻SQL的读取的数据的版本,是依赖于meta服务圈定这时刻里边所有需要读取的文件的集合,这也都是由meta服务保证的,全量或者是历史快照或者增量查询都是依赖于meta服务收掉统一的数据,读取的文件的list。
下面到增量计算的部分,Delta Table更好的管理全量和增量的数据后,扩展SQL的语法,做更好的增量数据的用户的支持,Delta Table是支持SQL全套的能力,是支持insert update delete和moge into的语法,同时扩展新的语法,比如Time Travel,可以看到右下角Time Travel是支持查询五分钟前的历史的数据,它的语法就是select*from third Table,time是确认时刻。也支持增量的查询,增量的查询的语法可能是需要用户写between、start和and时间的间距,把这时间间隔之内的数据读取出来。另外也支持用SQL的语法用Table change语法查询不同版本之间的数据的变化,输出数据的格式就是标准的CDC的格式,其他的引擎可以依赖于格式做MC加工的数据的Serving。另外stream的update ,stream update是记录Table,但是它有记录和读写版本信息。如果这条语句是from stream之后,如果支持屏显,slide屏显from stream,它的版本信息不会移动。
如果stream在加工链路,它的数据要插入到新目标表之后,它的版本信息就会有变化,所以依赖于stream的update用户可以比较容易的建设增量查询的pipeline,依赖于SQL原有的比较好的查询Query的优化能力了很多Bucket,例子Delta Table的update,update是做DT基准表,要和data的数据做update,数据经过Query,会经过原有的plan的执行过程。主要的是由supertask生成plan,生成的执行plan之后到实际的Runtime执行,在Stage里边分成多个并发,写入不同的Bucket的数据。如果Delta Table是普通的表,需要完整的shuffle过程,如果Delta表是ACID的表,可以做很多Shuffle 的优化,对于更新的操作,只需要一个stage。
下面介绍增量物化视图。物化视图在阿里的内部和公共云都有广泛的应用,用户可以用物化视图做很好的重复计算的治理。但之前的物化视图的刷新都是全量数据的刷新,有Delta Table之后,如果物化视图是依赖Delta Table创建的,物化视图刷新也可以做到增量。增量物化视图的创建的语法和之前很类似,但是有增量的关键字,所以熟悉物化视图的用户是有很低的学习的门槛。另外也支持两种不同的增量计算的算法,增量计算算法可能是基于状态,也可能是基于原表的部分,都是由于统一的cosbase框架做plan选择。实际的刷新的过程会产生很多的stars,比如job执行过程需要读取多少数据,要执行运行多长时间,消耗多少个co的资源,这些所有的stars都会落到bigMeta里边,优化下一次类似的物化视图的刷新的plan,增量物化视图刷新的例子,首先有T1的Delta表,它里边有100行数据,key只有a和b,有不同的value,中间位置有物化视图的定义,在T1上做简单的聚合,是Groby 1个key算sum,最开始基于存量的数据100行数据物化视图的结果,是a和b这两行数据。如果有c的数据进来后,比如插入一条新的数据,pk是101,同时它的key是a,它的value是2,然后insert的操作,之后物化视图也要有刷新的操作,刷新的操作会产生增量刷新的结果,结果key是a,它的value是123,因为有一条value等于20的数据插入之后,它的value会更新为143对,这条数据是upsert类型,表示pk等于1行数据要被更新掉,如果下一行CDC数据,比如这里边的数据是像刚才的101行的value等于20的数据要更新成10。
从CDC表示,可能是两行的数据,是有删除的操作,有insert操作,根据pk判断和原有的表的数据做当前状态变成把update的数据对应从20变成10,数据要做在计算到sum的结果中,所以sum的结果会从143更新到133,所有的全量数据和这两次增量数据,最终用户去访问mv1,mv1就会是做merge的read,返回最终的结果。增量物化视图的增量刷新是支持层级,是基于mv之上还可以创建新的mv,周期性调度task是内部平台的通用的task的能力,比如是append Table的刷新、增量物化尺度刷新都依赖周期性调度的task,对task也做抽象,可以让开放给用户来使用,创建task的语句就是create test。同时可以指定调度的周期,同时也支持用户指定表达式,在什么情况下做触发。MCQ 2.0 是对MC做全新的升级,有强隔离的环境,同时管控链路也做了很多优化,依赖于Fuxi的Cache做了很多不同的数据的Cache,以提高查询的性能,执行也做很多的优化加速执行,在调度链路上,做很多模块的融合,以提升的查询的性能。原来很多SQL查询的链路会有很多RBC的调用,不同角色的融合可能会做到进程内的方法的调用。同时做到很多类型的Cache, mat Cache, plan Cache,result Cache。如果Query完全没有任何变化,直接复用result Cache,MCQ 2.0 还推出Cachet的方式,可以将shuffle的数据做很好的Cache,对shuffle的pipe做signature。
如果signature没有变化的情况,第二次执行可以用上次的shuffle的结果直接复用,plan就会改写掉。相应的计算都可以节省掉。相对于物化视图,物化视图可以理解成是一张表,物化视图刷新还是需要很多计算的资源,IRC相当于不需要对原来的作业执行任何的性能的侵入就能将shuffle数据用于下次作业的加速。在plan层面也提到intelligent tuning会面向于lantency做很多优化,有更实时的stage的反馈,plan也会更激进、更优先的选择Hash的算法做优化,会应用Shuffle remove优化的能力,同时感知到Virtual warehouse资源占用情况,动态的调整作业的并发度。MCQ 2.0相对MCQ 1.0是有一倍的性能提升。右边图是在20亿的表上做聚合,没有任何的filter条件,能秒级的方式返回。MCQ 2.0的性能在512CU的情况下处理1TB的TPC-DS数据,平均Query的时间是4.7秒。同时看到最高的是TPC-DS里最复杂的Query,同时多个Query大并发的情况下,主要依赖stage调度的方式,多个Query大并发的情况下仍然稳定的表现。
四、MaxCompute近实时展望
最后一部分是近实时的展望,三个部分在近实时方面做更多的能力的升级,首先是Delta Table后续会支持秒级的数据导入,同时会在pip表的基础扩展append Table。增量计算的部分也会通过增量计算和其他引擎做更好的融合。在MCQ 2.0方面,现在正在进行Pipeline的Runtime的重大重构,重构之后会更好的从近程模型切换到多线程的模型,会更好的利用Virtual warehouse中的多核的计算资源,同时会继续丰富Cache的能力。另外也会将MCQ 2.0和Maxframe做更深度的融合,让Maxframe做大数据AI操作的体验会更顺滑。