概述
Paimon 和 MaxCompute 的对接,很早就开始。虽然一直是可用状态,但是经常因为性能对比于内表或者其他外表不足,而导致客户在使用上十分艰难。为此,Paimon 团队和 MaxCompute 团队进行了半年的紧密合作,专门为提高 Paimon 在 MaxCompute 上的性能进行了专项优化研发。本文将从过去 MaxCompute 读写慢的原因,双方团队合作进行了哪些优化以及最后优化效果三个方面构文,叙述最新进展。
MaxCompute 外表架构升级(快)
MaxCompute 是纯 c++ 代码结构,为了跟 Hive 生态保持一定的兼容性,读写外表一般通过 Hive 的读写接口。此接口,通过 next 方法获取下一条消息数据。在大数据领域,绝大多数场景都是使用列存或行列混合方式来进行数据存储。因此,从 Paimon sdk 读取到的数据,再经由 MaxCompute 消费就会存在以下过程:
MaxCompute 调用 Paimon sdk 读取数据,数据存在 Paimon 的缓冲中,以列存的形式存储 --> Maxcompute 通过 "per row" 接口,一行一行地从 Paimon 缓存中读取数据 --> MaxCompute 再将数据由行式转化为列式进行向量化计算。数据流转上,从列式读取,转换成行,再转换成列,消耗了巨量的 CPU 资源。
为此,团队设计了新的基于 Arrow 的接口,使得列存数据能够方便地在 java 和 c++ 之间进行传递。从 Paimon 读取到的数据,只需要进过一次列式拷贝,到 Arrow 的数据结构中,然后直接通过 Arrow 的跨语言特性传递给 MaxCompute 进行使用即可。当然,这一步的前提是,不需要经过主键 merge。如果是 Paimon 主键表,依旧需要使用行的形式进行数据去重,再通过 Arrow 接口传递给 MaxCompute 使用。
小结:
- 非主键表:可以直接 Arrow 化处理,一次列式拷贝即可,大幅节约 CPU 资源。
- 主键表:仍需行级别去重,无法达到与非主键表完全一致的性能,但也在新的框架下有所提升。
新旧接口对比
除此之外,新接口实现了在 MaxCompute 上直接写 Paimon 外表。Paimon 通过实现 native writer,已经直接适配 MaxCompute 内部的 aliorc 格式。
Paimon sdk 内置 (更快)
过去,MaxCompute 内置了 hudi 等其他的 sdk,但是 MaxCompute 通过 plugin 机制置放 Paimon sdk (基于某些沟通没到位的问题)。导致,每次启动 MaxCompute 的 worker pod,都需要从远方拉一遍 Paimon sdk 的包,且通过 plugin load 的方式进行加载。同时,因为Paimon jar是用户自定义的jar包,从安全角度考虑必须以子进程+子容器方式被拉起和执行,然后由父进程主动与子进程通信并基于TCP/Domain Socket等方式完成数据交换,拉起进程的耗时受整个集群的CPU水位影响,数据交换方式也由内部调用变成了协议传输,大大增加了时间消耗。因此,每个有关 Paimon 的 task,都必须要有 30s 左右的跟任务完全无关的时间消耗。且随着数据量和集群压力的增加,这个消耗会更多(理论上应该保持不变,但是测下来会更久)。
因此与MC技术团队沟通后,MaxCompute 团队内置了 Paimon sdk。Paimon sdk 的发版会受到 MaxCompute 团队发版的限制,提升了性能,增强了用户体验。
native direct read && write (无敌快)
为了让 Paimon 的读性能媲美内表,我们进行了更近一步的读写优化。当 MaxCompute 方收到的 Paimon split 数据,不需要经过 Paimon sdk merge 的时候,直接利用 MaxCompute 自身读引擎进行数据读取。对于非主键表,直接媲美 MaxCompute 内表的读取速度和资源消耗。
同时,Paimon 侧设计了新的 Arrow 批量写接口。从 MaxCompute 获取到的 Arrow batch 数据,将直接通过 Paimon sdk 写入到 Paimon 表,无需经过任何行列转换或者数据拷贝。Paimon 通过设计新的 Paimon-jni 模块,直接在底层通过 c++ Arrow 写入 Arrow batch 数据,效果无异于 MaxCompute 直写。(此优化依旧只针对非主键表,主键表需要内存中 merge)
为了进一步加速 MaxCompute 的写,Paimon 侧还进行了异步 setup 优化(MaxCompute 异步调用 Paimon 初始化,同时进行数据准备),列式分拣动态分区等多种优化,进一步提升 MaxCompute 对于 Paimon 的读写性能。
这套双方团队的新架构,已经成功经过了淘宝天猫双十一的巨量流量验证,并且取得了巨大的成功。
性能测试
首先,我们测试在 MaxCompute 上对 Paimon 外表,hudi 外表,以及 MaxCompute 内表的读取结果。
环境构建
首先,在共有云上开通 dlf、vvp、MaxCompute 以及 oss 服务。为了构建测试环境,我们通过 vvp 创建 dlf catalog,利用 catalog 建表,并通过 vvp 流任务写入数据。
构建 catalog:
-- my-catalog 也可以改为自定义的英文名称CREATE CATALOG `my-catalog` WITH ( 'type' = 'paimon', 'metastore' = 'dlf', 'warehouse' = '<warehouse>', 'dlf.catalog.id' = '<dlf.catalog.id>', 'dlf.catalog.accessKeyId' = '<dlf.catalog.accessKeyId>', 'dlf.catalog.accessKeySecret' = '<dlf.catalog.accessKeySecret>', 'dlf.catalog.endpoint' = '<dlf.catalog.endpoint>', 'dlf.catalog.region' = '<dlf.catalog.region>', 'fs.oss.endpoint' = '<fs.oss.endpoint>', 'fs.oss.accessKeyId' = '<fs.oss.accessKeyId>', 'fs.oss.accessKeySecret' = '<fs.oss.accessKeySecret>');
建表(举例):
CREATE TABLE `dlf-lakehouse`.`my_db`.ods_sell_meta_stream ( pt INT, `user_id` BIGINT, sell_id BIGINT, sell_time TIMESTAMP, sell_name STRING, sell_nick_name STRING, sell_phone STRING, sell_number STRING, sell_position STRING) PARTITIONED BY (pt) WITH ( 'file.format' = 'parquet');
利用 vvp etl 写入测试数据:
INSERT INTO `dlf-lakehouse`.`my_db`.ods_sell_meta_stream SELECT * FROM KafkaTable;
接着通过 odps 的 dlf 创建引导,开通和 dlf 打通的 odps 项目(需要绑定 odps 数据源到 DataWorks 项目开发中心), 具体操作可见 MaxCompute对应文档[1]。
读测试
对于 hudi 表,我们使用了 COPY_ON_WRITE 的写入方式,以达到最好的读取性能。hudi 表的 schema 如下(hudi 必须要定义 uudi 才能导入 dlf,导致配置了只进行 insert 操作):
CREATE TEMPORARY TABLE flink_hudi_tbl ( `uuid` STRING PRIMARY KEY NOT ENFORCED, `ts` STRING, pt INT, `user_id` BIGINT, sell_id BIGINT, sell_time STRING, sell_name STRING, sell_nick_name STRING, sell_phone STRING, sell_number STRING, sell_position STRING) WITH ( 'connector' = 'hudi', 'oss.endpoint' = 'oss-cn-hangzhou-internal.aliyuncs.com', 'accessKeyId' = 'xx', 'accessKeySecret' = 'xx', 'path' = 'oss://<path>', 'table.type' = 'COPY_ON_WRITE', 'hive_sync.enable' = 'true', 'hive_sync.mode' = 'hms', 'hive_sync.db' = 'flink_hudi', 'hive_sync.table' = 'flink_hudi_tbl', 'dlf.catalog.region' = 'cn-hangzhou', 'dlf.catalog.endpoint' = 'dlf-share.cn-hangzhou.aliyuncs.com', 'compaction.async.enabled' = 'false', 'write.operation' = 'insert', 'index.global.enabled' = 'false');
对于 Paimon 表,我们使用如下 schema 结构 (不需要任何参数):
CREATE TABLE `dlf-lakehouse`.`my_db`.ods_sell_meta_stream_write_none_native ( `uuid` STRING, `ts` STRING, pt INT, `user_id` BIGINT, sell_id BIGINT, sell_time TIMESTAMP, sell_name STRING, sell_nick_name STRING, sell_phone STRING, sell_number STRING, sell_position STRING);
表基本信息:
条数:66140055
每条数据大小 2k 以上
测试结果如下:
sql1
select max(sell_nick_name) from ods_sell_meta_stream_write_none_native;
新老 Paimon 接口在此 sql 下性能对比:
sql2(对比双字段带过滤条件查询)
select user_id, max(sell_nick_name) from ods_sell_meta_stream_write_none_native where user_id > 100 and user_id < 200 group by user_id;
写测试
因为 MaxCompute 不支持写 hudi,因此仅对比 MaxCompute 写 Paimon 和写内表。以下为两次静态分区写的平均消耗对比。
表特征:
1.2亿数据量,每条数据 3k 大小左右
sql:
INSERT INTO ods_sell_meta_stream_write_none_native3 partition (pt=1) select user_id,sell_id,sell_time,sell_name,sell_nick_name,sell_phone,sell_number,sell_position from ods_sell_meta_stream_write_none_native where pt = 1;
性能实测下来,接近 MaxCompute 内表。
相比于 MaxCompute 上 parquet、orc 外表
sql:
select user_id, max(sell_nick_name) from ods_sell_meta_stream_write_none_native where user_id >
sql:
select max(sell_nick_name) from ods_sell_meta_stream_write_none_native;
sql:
INSERT INTO <target> partition (pt=1) select user_id,sell_id,sell_time,sell_name,sell_nic
在当前版本,MaxCompute 对 Paimon 表的读写速度,快于对 parquet、orc 外表的直接读写速度,且读对比下, Paimon 比其他外表快了 10 倍以上。
总结
通过 Paimon 和 MaxCompute 团队共同努力,我们有以下功能或实战亮点:
- Paimon 在 MaxCompute 上实现了原生读写能力。
- 性能接近 MaxCompute 内表,多倍于传统外表。
- 减少中间拷贝与转换,显著降低 CPU 开销与延迟。
- 已通过双十一实战验证,稳定可靠
欢迎大家使用和体验!
作者简介 PROFILE
叶俊豪 (仟弋)
Apache Paimon Committer, 阿里云表存储团队核心研发
王烨 (萌豆)
阿里云 MaxCompute 团队核心研发
[1]. MaxCompute对应文档