1.引言
本文主要从四部分介绍,阿里云云原生大数据计算服务MaxCompute湖仓一体近实时增量处理技术架构的核心设计和应用场景。
2.MaxCompute 湖仓一体发展进程
MaxCompute作为阿里云自研的海量大数据处理平台已经有十几年的发展历史,在规模和扩展性方面一直表现比较优秀。其依托阿里云飞天分布式操作系统,能够提供快速,完全托管的EB级数据仓库及数据湖解决方案,可经济高效的处理海量数据。目前,其承担着阿里集团绝大部分离线数据存储和计算力,是阿里云产品矩阵中最重要的自研核心平台之一。
MaxCompute发展之初,主要聚焦数仓方面的大数据处理业务场景,并且处理的数据源主要为格式化数据。随着数据处理场景的多样化和业界数据湖架构的兴起,加上阿里集团内部本身数据也非常多,支持多样化数据源也就成为了一个必选项。因此MaxCompute 设计了完善的外表机制,可以读取存储在外部多种格式的数据对象,例如Hadoop开源体系,OSS半结构化或非结构化数据,为此也尽可能设计开发统一的元数据处理架构,此阶段MaxCompute 在湖仓一体化解决方案中迈出了重要一步,极大的扩展了数据处理的业务场景,有效的打破数据孤岛,联动各方面的数据进行综合分析来挖掘整体数据价值。但时效性不足,通常是T+1离线场景。
随着用户数和数据规模不断增加,很多业务场景也越加复杂,需要更加完善综合的整体解决方案。其中的关键环节之一就是数据需要更加高效的流转起来,为此MaxCompute 进一步设计完善开放存储和计算架构,更好的去融合生态,让数据可流畅的进得来也出得去。此外,还有一个重要的业务场景是大规模批量处理和高时效高效率增量处理一体化解决方案,为简化用户数据处理链路,节省不同系统之间的数据迁移成本以及冗余计算和存储成本,MaxCompute 团队设计开发了MaxCompute离线和近实时增量处理的一体化架构。总体来说,现阶段以及未来会基于统一的存储、统一的元数据、统一的计算引擎有效支撑湖仓一体的整体技术架构,让数据能够开放互通高效流转,并且计算和存储成本持续优化。
3.MaxCompute近实时增量处理技术架构简介
3.1MaxCompte离线 &近实时增量处理业务系统架构现状
随着当前数据处理的业务场景日趋复杂,对于时效性要求低的大规模数据全量批处理的场景,直接使用MaxCompute 足以很好的满足业务需求,对于时效性要求很高的秒级实时数据处理或者流处理,则需要使用实时系统或流系统来满足需求。
但其实对于大部份业务场景,并不要求秒级数据更新可见,更多的是分钟级或者小时级的增量数据处理场景,并且叠加海量数据批处理场景。
对于这类业务场景的解决方案,如果使用单一的MaxCompute 离线批量处理链路,为了计算的高效性,需要将用户各种复杂的一些链路和处理逻辑转化成T+1的批次处理,链路复杂度增加,也可能产生冗余的计算和存储成本,且时效性也较差。但如果使用单一的实时系统,资源消耗的成本比较高,性价比也较低,并且大规模数据批处理的稳定性也不足。因此当前比较典型的解决方案是Lambda架构,全量批处理使用MaxCompute链路,时效性要求比较高的增量处理使用实时系统链路,但该架构也存在大家所熟知的一些固有缺陷,比如多套处理和存储引擎引发的数据不一致问题,多份数据冗余存储和计算引入的额外成本,架构复杂以及开发周期长等。
针对这些问题近几年大数据开源生态也推出了各种解决方案,最流行的就是Spark/Flink/Presto开源数据处理引擎,深度集成开源数据湖Hudi、Delta Lake和Iceberg三剑客,来综合提供解决方案,解决Lamdba架构带来的一系列问题,而MaxCompute近一年自研开发的离线近实时增量处理一体化架构,同样是为了解决这些问题而设计,不仅仅具备分钟级的增全量数据读写以及数据处理的业务需求,也能提供Upsert,Timetravel等一系列实用功能,可大幅扩展业务场景,并且有效的节省数据计算,存储和迁移成本,切实提高用户体验。下文就将介绍该技术架构的一些典型的功能和设计。
3.2MaxCompute近实时增量处理技术架构
MaxCompute近实时增量处理整体架构的设计改动主要集中在五个模块:数据接入、计算引擎、数据优化服务,元数据管理,数据文件组织。其他部份直接复用MaxCompute 已有的架构和计算流程,比如数据的分布式存储直接集成了阿里云基础设施盘古服务。
- 数据接入主要支持各种数据源全量和近实时增量导入功能。MaxCompute 联合相关产品定制开发多种数据接入工具,例如MaxCompute 定制开发的Flink Connector,DataWorks的数据集成等,用来支持高效的近实时增量数据导入。这些工具会对接MaxCompute 的数据通道服务Tunnel Server,主要支持高并发分钟级增量数据写入。此外,也支持MaxCompute SQL,以及其它一些接口用于支持全量数据高效写入。
- 计算引擎主要包含MaxCompute 自研的SQL引擎,负责Timetravel和增量场景下的SQL DDL/DML/DQL的语法解析,优化和执行链路。此外,MaxCompute内部集成的Spark等引擎也在设计开发支持中。
- 数据优化服务主要由MaxCompute 的Storage Service来负责智能的自动管理增量数据文件,其中包括小文件合并Clustering,数据Compaction,数据排序等优化服务。对于其中部分操作,Storage Service会根据数据特征,时序等多个维度综合评估,自动执行数据优化任务,尽可能保持健康高效的数据存储和计算状态。
- 元数据管理主要负责增量场景下数据版本管理,Timetravel管理,事务并发冲突管理,元数据更新和优化等。
- 数据文件组织主要包含对全量和增量数据文件格式的管理以及读写相关的模块。
4.核心设计解剖
4.1统一的数据文件组织格式
要支持全量和增量处理一体化架构首先需要设计统一的表类型以及对应的数据组织格式,这里称为Transactional Table2.0,简称TT2,基本可以支持普通表的所有功能,同时支持增量处理链路的新场景,包括timetravel查询、upsert操作等。
TT2要生效只需要在创建普通表时额外设置主键primary key(PK),以及表属性transactional为true即可。PK列用于支持Upsert链路功能,PK值相同的多行记录在查询或者Compaction会merge成一行数据,只保留最新状态。transactional属性则代表支持ACID事务机制,满足读写快照隔离,并且每行数据会绑定事务属性,比如事务timestamp,用来支持timetravel查询,过滤出正确数据版本的记录。此外TT2的tblproperties还可以设置其他的一些可选的表属性,比如write.bucket.num用来配置数据写入的并发度,acid.data.retain.hours用来配置历史数据的有效查询时间范围等。
TT2表数据文件存在多种组织格式用来支持丰富的读写场景。其中base file数据文件不保留Update/Delete中间状态,用来支撑全量批处理的读写效率,delta file增量数据文件会保存每行数据的中间状态,用于满足近实时增量读写需求。
为了进一步优化读写效率,TT2支持按照BucketIndex对数据进行切分存储,BucketIndex数据列默认复用PK列,bucket数量可通过配置表属性write.bucket.num指定,数据写入的高并发可通过bucket数量水平扩展,并且查询时,如果过滤条件为PK列,也可有效的进行Bucket裁剪查询优化。数据文件也可按照PK列进行排序,可有效提升MergeSort的效率,并有助于DataSkipping查询优化。数据文件会按照列式压缩存储,可有效减少存储的数据量,节省成本,也可有效的提升IO读写效率。
4.2数据近实时流入
前面介绍了统一的数据组织格式,接下来需要考虑数据如何高效写入TT2。
数据流入主要分成近实时增量写入和批量写入两种场景。这里先描述如何设计高并发的近实时增量写入场景。用户的数据源丰富多样,可能存在数据库,日志系统或者其他消息队列等系统中,为了方便用户迁移数据写入TT2, MaxCompute定制开发了Flink Connector、Dataworks数据集成以及其它开源工具,并且针对TT2表做了很多专门的设计开发优化。这些工具内部会集成MaxCompute 数据通道服务Tunnel提供的客户端SDK,支持分钟级高并发写入数据到Tunnel Server,由它高并发把数据写入到每个Bucket的数据文件中。
写入并发度可通过前面提及的表属性write.bucket.num来配置,因此写入速度可水平扩展。对同一张表或分区的数据,写入数据会按pk值对数据进行切分,相同pk值会落在同一个bucket桶中。此外,数据分桶的好处还有利于数据优化管理操作例如小文件clustering,compaction等都可以桶的粒度来并发计算,提高执行效率。分桶对于查询优化也非常有好处,可支持bucket裁剪、shuffle move等查询优化操作。
Tunnel SDK提供的数据写入接口目前支持upsert和delete两种数据格式,upsert包含insert / update两种隐含语义,如数据行不存在就代表insert,如已存在就代表update。commit接口代表原子提交这段时间写入的数据如返回成功就代表写入数据查询可见,满足读写快照隔离级别,如返回失败,数据需要重新写入。
4.3SQL批量写入
批量导入主要通过SQL进行操作。为了方便用户操作,实现了操作TT2所有的DDL / DML语法。SQL引擎内核模块包括Compiler、Optimizer、Runtime等都做了大量改造开发以支持相关功能,包括特定语法的解析,特定算子的Planner优化,针对pk列的去重逻辑,以及runtime构造Upsert格式数据写入等。数据计算写入完成之后,会由Meta Service来原子性更新Meta信息,此外,也做了大量改造来支持完整的事务机制保证读写隔离、事务冲突检测等等。
4.4小数据文件合并
由于TT2本身支持分钟级近实时增量数据导入,高流量场景下可能会导致增量小文件数量膨胀,从而引发存储访问压力大、成本高,并且大量的小文件还会引发meta更新以及分析执行慢,数据读写IO效率低下等问题,因此需要设计合理的小文件合并服务, 即Clustering服务来自动优化此类场景。
Clustering服务主要由MaxCompute 内部的Storage Service来负责执行,专门解决小文件合并的问题,需要注意的是,它并不会改变任何数据的历史中间状态,即不会消除数据的Update/Delete中间状态。
结合上图可大概了解Clustering服务的整体操作流程。Clustering策略制定主要根据一些典型的读写业务场景而设计,会周期性的根据数据文件大小,数量等多个维度来综合评估,进行分层次的合并。Level0到Level1主要针对原始写入的Delta小文件(图中蓝色数据文件)合并为中等大小的Delta文件(图中黄色数据文件),当中等大小的Delta文件达到一定规模后,会进一步触发Level1到Level2的合并,生成更大的Delta文件(图中橙色数据文件)。
对于一些超过一定大小的数据文件会进行专门的隔离处理,不会触发进一步合并,避免不必要的读写放大问题,如图中Bucket3的T8数据文件。超过一定时间跨度的文件也不会合并,因为时间跨度太大的数据合并在一起的话,当TimeTravel或者增量查询时,可能会读取大量不属于此次查询时间范围的历史数据,造成不必要的读放大问题。
由于数据是按照BucketIndex来切分存储的,因此Clustering服务会以bucket粒度来并发执行,大幅缩短整体运行时间。
Clustering服务需要和Meta Service进行交互,获取需要执行此操作的表或分区的列表,执行结束之后,会把新老数据文件的信息传入Meta Service,它负责Clustering操作的事务冲突检测,新老文件meta信息原子更新、老的数据文件回收等。
Clustering服务可以很好的解决大文件数量膨胀引发的一系列效率低下的读写问题,但不是频率越高越好,执行一次也会消耗计算和IO资源,至少数据都要全部读写一遍,存在一定的读写放大问题。因此执行策略的选择尤其重要,所以目前暂时不会开放给用户手动执行,而是引擎根据系统状态智能自动触发执行,可保障Clustering服务执行的高效率。
4.5数据文件Compaction
除了小文件膨胀问题需要解决外,依然还有一些典型场景存在其它问题。TT2支持update、delete格式的数据写入,如果存在大量此格式的数据写入,会造成中间状态的冗余记录太多,引发存储和计算成本增加,查询效率低下等问题。因此需要设计合理的数据文件compaction服务优化此类场景。
Compaction服务主要由MaxCompute 内部的Storage Service来负责执行,既支持用户手动执行SQL语句触发、也可通过配置表属性按照时间频率、Commit次数等维度自动触发。此服务会把选中的数据文件,包含base file和delta file,一起进行Merge,消除数据的Update / Delete中间状态,PK值相同的多行记录只保留最新状态的一行记录,最后生成新的只包含Insert格式的base file。
结合上图可大概了解Compaction服务的整体操作流程。t1到t3时间段,一些delta files写入进来,触发compaction操作,同样会以bucket粒度并发执行,把所有的delta files进行merge,然后生成新的base file。之后t4和t6时间段,又写入了一批新的delta files,再触发compaction操作,会把当前存在的base file和新增的delta files一起做merge操作,重新生成一个新的base file。
Compaction服务也需要和Meta Service进行交互,流程和Clustering类似,获取需要执行此操作的表或分区的列表,执行结束之后,会把新老数据文件的信息传入Meta Service,它负责Compaction操作的事务冲突检测,新老文件meta信息原子更新、老的数据文件回收等。
Compaction服务通过消除数据中间历史状态,可节省计算和存储成本,极大加速全量快照查询场景的效率,但也不是频率越高越好,首先执行一次也要读取一遍全量数据进行Merge,极大消耗计算和IO资源,并且生成的新base file也会占据额外的存储成本,而老的delta file文件可能需要用于支持timetravel查询,因此不能很快删除,依然会有存储成本,所以Compaction操作需要用户根据自己的业务场景和数据特征来合理选择执行的频率,通常来说,对于Update / Delete格式的记录较多,并且全量查询次数也较多的场景,可以适当增加compaction的频率来加速查询。
4.6事务管理
以上主要介绍了典型的数据更新操作,而它们的事务并发管理都会统一由Meta Service进行控制。
上面表格详细展示了各个具体操作并发执行的事物冲突规则。Meta服务采用了经典的MVCC模型来满足读写快照隔离,采用OCC模型进行乐观事务并发控制。对于一些高频的操作单独设计优化了事务冲突检测和重试机制,如clustering操作和insert into 并发执行,即使事务Start和Commit时间出现交叉也不会冲突失败,都能成功执行,即使在原子提交Meta信息更新时出现小概率失败也可在Meta层面进行事务重试,代价很低,不需要数据重新计算和读写。
此外,各种数据文件信息以及快照版本也需要有效的管理,其中包含数据版本、统计信息、历史数据、生命周期等等。对于TimeTravel和增量查询,Meta层面专门进行了设计开发优化,支持高效的查询历史版本和文件信息。
4.7TimeTravel查询
基于TT2,计算引擎可高效支持典型的业务场景TimeTravel查询,即查询历史版本的数据,可用于回溯历史状态的业务数据,或数据出错时,用来恢复历史状态数据进行数据纠正,当然也支持直接使用restore操作恢复到指定的历史版本。
对于TimeTravel查询,会首先找到要查询的历史数据版本之前最近的base file,再查找后面的delta files,进行合并输出,其中base file可以用来加速查询读取效率。
这里结合上图进一步描述一些具体的数据查询场景。比如创建一TT2表,schema包含一个pk列和一个val列。左边图展示了数据变化过程,在t2和t4时刻分别执行了compaction操作,生成了两个base file: b1和b2。b1中已经消除了历史中间状态记录(2,a),只保留最新状态的记录 (2,b)。
如查询t1时刻的历史数据,只需读取delta file (d1)进行输出; 如查询t2时刻,只需读取base file (b1) 输出其三条记录。如查询t3时刻,就会包含base file ( b1)加上delta file (d3)进行合并输出,可依此类推其他时刻的查询。
可见,base文件虽可用来加速查询,但需要触发较重的compaction操作,用户需要结合自己的业务场景选择合适的触发策略。
TimeTravel可根据timestamp和version两种版本形态进行查询,除了直接指定一些常量和常用函数外,我们还额外开发了get_latest_timestamp和get_latest_version两个函数,第二个参数代表它是最近第几次commit,方便用户获取我们内部的数据版本进行精准查询,提升用户体验。
4.8增量查询
此外,SQL增量查询也是重点设计开发的场景,主要用于一些业务的近实时增量处理链路,新增SQL语法采用between and关键字,查询的时间范围是左开右闭,即begin是一个开区间,必须大于它,end是一个闭区间。
增量查询不会读取任何base file,只会读取指定时间区间内的所有delta files,按照指定的策略进行Merge输出。
通过上诉表格可进一步了解细节,如begin是t1-1,end是t1,只读取t1时间段对应的delta file (d1)进行输出, 如果end是t2,会读取两个delta files (d1和d2);如果begin是t1,end是t2-1,即查询的时间范围为(t1, t2),这个时间段是没有任何增量数据插入的,会返回空行。
对于Clustering和Compaction操作也会产生新的数据文件,但并没有增加新的逻辑数据行,因此这些新文件都不会作为新增数据的语义,增量查询做了专门设计优化,会剔除掉这些文件,也比较贴合用户使用场景。
4.9历史版本数据回收
由于Timetravel和增量查询都会查询数据的历史状态,因此需要保存一定的时间,可通过表属性acid.data.retain.hours来配置保留的时间范围。如果历史状态数据存在的时间早于配置值,系统会开始自动回收清理,一旦清理完成,TimeTravel就查询不到对应的历史状态了。回收的数据主要包含操作日志和数据文件两部分。
同时,也会提供purge命令,用于特殊场景下手动触发强制清除历史数据。
4.10数据接入生态集成现状
初期上线支持接入TT2的工具主要包括:
- DataWorks数据集成:支持数据库等丰富的数据源表全量以及增量的同步业务。
- MaxCompute Flink Connector:支持近实时的upsert数据增量写入,这一块还在持续优化中,包括如何确保Exactly Once语义,如何保障大规模分区写入的稳定性等,都会做深度的设计优化。
- MaxCompute MMA:支持大规模批量 Hive数据迁移。很多业务场景数据迁移可能先把存在的全量表导入进来,之后再持续近实时导入增量数据,因此需要有一些批量导入的工具支持。
- 阿里云实时计算Flink版Connector:支持近实时Upsert数据增量写入,功能还在完善中。
- MaxCompute SDK:直接基于SDK开发支持近实时导入数据,不推荐
- MaxCompute SQL:通过SQL批量导入数据
对其它一些接入工具,比如Kafka等,后续也在陆续规划支持中。
4.11特点
作为一个新设计的架构,MaxCompute 会尽量去覆盖开源数据湖(HUDI / Iceberg)的一些通用功能,有助于类似业务场景的用户进行数据和业务链路迁移。此外,MaxCompute离线 & 近实时增量处理一体化架构还具备一些独特的亮点:
- 统一的存储、元数据、计算引擎一体化设计,做了非常深度和高效的集成,具备存储成本低,数据文件管理高效,查询效率高,并且Timetravel / 增量查询可复用MaxCompute批量查询的大量优化规则等优势。
- 全套统一的SQL语法支持,非常便于用户使用。
- 深度定制优化的数据导入工具,支持一些复杂的业务场景。
- 无缝衔接MaxCompute现有的业务场景,可以减少迁移、存储、计算成本。
- 完全自动化管理数据文件,保证更好的读写稳定性和性能,自动优化存储效率和成本。
- 基于MaxCompute平台完全托管,用户可以开箱即用,没有额外的接入成本,功能生效只需要创建一张新类型的表即可。
- 作为完全自研的架构,需求开发节奏完全自主可控。
5.应用实践与未来规划
5.1离线 & 近实时增量处理一体化业务架构实践
基于新架构,MaxCompute可重新构建离线 & 近实时增量处理一体化的业务架构,即可以解决大部分的Lambda架构的痛点,也能节省使用单一离线或者实时系统架构带来的一些不可避免的计算和存储成本。各种数据源可以方便的通过丰富的接入工具实现增量和离线批量导入,由统一的存储和数据管理服务自动优化数据编排,使用统一的计算引擎支持近实时增量处理链路和大规模离线批量处理链路,而且由统一的元数据服务支持事务和文件元数据管理。它带来的优势非常显著,可有效避免纯离线系统处理增量数据导致的冗余计算和存储,也能解决纯实时系统高昂的资源消耗成本,也可消除多套系统的不一致问题和减少冗余多份存储成本以及系统间的数据迁移成本,其他的优势可以参考上图,就不一一列举了。总体而言,就是使用一套架构既可以满足增量处理链路的计算存储优化以及分钟级的时效性,又能保证批处理的整体高效性,还能有效节省资源使用成本。
5.2未来规划
最后再看一下未来一年内的规划:
- 持续完善SQL的整体功能支持,降低用户接入门槛;完善Schema Evolution支持。
- 更加丰富的数据接入工具的开发支持,持续优化特定场景的数据写入效率。
- 开发增量查询小任务分钟级别的pipeline自动执行调度框架,极大的简化用户增量处理链路业务的开发难度,完全自动根据任务执行状态触发pipeline任务调度,并自动读取增量数据进行计算。
- 持续继续优化SQL查询效率,以及数据文件自动优化管理。
- 扩展生态融合,支持更多的第三方引擎读写TT2。
新架构目前还没有在MaxCompute最新的对外版本推出,大概6-7月份我们将对外发布邀测使用,大家可以通过关注MaxCompute官网了解相关进展。也欢迎大家加入MaxCompute开发者钉钉群,与我们直接沟通。
6.Q&A
Q1:Bucket数量的设置与commit间隔以及compaction间隔设置的最佳推荐是什么?
A1:Bucket数量与导入的数据量相关,数据量越大,建议设置的bucket数量多一些,在批量导入的场景,推荐每个bucket的数据量不要超过1G,在近实时增量导入场景,也要根据Tunnel的可用资源以及QPS流量情况来决定bucket数量。对于commit的间隔虽然支持分钟级数据可见,但如果数据规模较大,bucket数量较多,我们推荐间隔最好在五分钟以上,也需要考虑结合 Flink Connector的checkpoint机制来联动设置commit频率,以支持Exactly Once语义,流量不大的话,5~10分钟间隔是推荐值。Compaction间隔跟业务场景相关,它有很大的计算成本,也会引入额外的base file存储成本,如果对查询效率要求比较高且比较频繁,compaction需要考虑设置合理的频率,如果不设置,随着delta files和update记录的不断增加,查询效率会越来越差。
Q2:会不会因为commit太快,compaction跟不上?
A2:Commit频率和Compaction频率没有直接关系,Compaction会读取全量数据,所以频率要低一些,至少小时或者天级别,而Commit写入增量数据的频率是比较快的,通常是分钟级别。
Q3:是否需要专门的增量计算优化器?
A3:这个问题很好,确实需要有一些特定的优化规则,目前只是复用我们现有的SQL优化器,后续会持续规划针对一些特殊的场景进行增量计算的设计优化。
Q4:刚刚说会在一两个月邀测MaxCompute新架构,让大家去咨询。是全部替换为新的架构还是上线一部分的新架构去做些尝试,是要让用户去选择吗?还是怎样?
A4:新技术架构对用户来说是透明的,用户可以通过MaxCompute无缝接入使用,只需要创建新类型的表即可。针对有这个需求的新业务或者之前处理链路性价比不高的老业务,可以考虑慢慢切换到这条新链路尝试使用。
【MaxCompute发布免费试用计划,为数仓建设提速】新用户可0元领取5000CU*小时计算资源与100GB存储,有效期3个月。立即领取>>