AutoMQ x OSS 的 Iceberg 数据入湖的最佳实践

本文涉及的产品
对象存储 OSS,20GB 3个月
对象存储 OSS,内容安全 1000 次 1年
对象存储 OSS,恶意文件检测 1000次 1年
简介: 在数据湖技术生态中,Apache Iceberg凭借其开放性设计已确立事实标准地位。该技术不仅获得全球企业广泛采用,还构建了包含Apache Spark、Amazon Athena、Presto等主流计算引擎的完整生态系统。

【阅读原文】戳:AutoMQ x OSS 的 Iceberg 数据入湖的最佳实践

背景

 

 

 

在数字化转型进程中,用户交互行为产生的多维度数据已成为企业的重要战略资产。以短视频平台为例,基于用户点赞事件的实时推荐算法能显著提升用户活跃度和平台粘性。这类实时数据主要通过Apache Kafka流处理平台进行传输,通过其扇出(Fanout)机制实现多业务系统的并行消费。企业的数据应用需求呈现双重特性:一方面需要实时流处理能力,另一方面需要依托历史数据进行多维聚合分析。大数据分析技术经过多年演进,已从传统数据仓库架构发展为现代数据湖体系。

 

在数据湖技术生态中,Apache Iceberg凭借其开放性设计已确立事实标准地位。该技术不仅获得全球企业广泛采用,还构建了包含Apache Spark、Amazon Athena、Presto等主流计算引擎的完整生态系统。2024年AWS re:Invent大会上,基于Iceberg格式的S3 Tables服务正式发布,标志着云原生数据湖解决方案进入新阶段。

 

以Apache Kafka、数据湖平台、Apache Iceberg表格式为核心的现代化数据湖架构已成为新趋势。随之而来的挑战包括:

 

高效数据写入:数据写入模式和分区策略直接影响查询效率

运维、架构和管理复杂度提升:Apache Kafka的流数据不感知schema,需要经过处理和转化才能以Iceberg表格式存储到数据湖中。这带来了元数据管理、schema演进以及流转表数据处理任务管理等新挑战。

 

本文将从三个维度展开论述:首先分析Iceberg的技术优势及其成为行业标准的原因,其次详细阐述数据入湖的最佳实践方法,最后重点介绍AutoMQ如何利用阿里云OSS高效解决Kafka数据入湖问题。通过AutoMQ和阿里云服务的结合,用户可以轻松实现Kafka数据入湖的最佳实践。

 

小贴士:AutoMQ是构建在对象存储上的新一代Kafka,能实现秒级自动弹性并显著降低成本,目前服务于吉利汽车、京东、知乎、小红书、Grab等知名企业。作为阿里云的优秀合作伙伴,AutoMQ可通过阿里云市场直接订阅部署。

 

 

 

 

Iceberg的优势

 

 

 

ACID事务

 

 

在并发控制机制方面,Iceberg采用基于快照隔离的乐观并发控制(Optimistic Concurrency Control)实现ACID事务保障。该机制允许多个写入事务与读取事务并行执行,其核心设计假设事务冲突概率较低:在事务提交阶段通过版本号校验完成冲突检测,而非传统悲观锁的预锁定方式。这种设计有效降低锁争用,提升系统吞吐量。

 

具体写入流程包含以下关键步骤:1) 将增量数据写入新的数据文件(DataFile)及删除文件(DeleteFile);2) 生成新版本快照(Snapshot);3) 创建关联的元数据文件(MetadataFile);4) 通过CAS(Compare and Swap)原子操作更新Catalog中的元数据指针指向新版本。只有当元数据指针更新成功时,本次写入才被视为有效提交。

 

Iceberg的读写隔离机制建立在多快照之上:每个读取操作访问的是特定时间点的快照状态,而写入操作始终作用于新生成的数据文件并创建独立快照。由于快照的不可变性,读取操作无需任何锁同步机制即可实现:a) 不同Reader之间的隔离保障;b) Reader与Writer的读写隔离。这种设计使得查询性能不会因写入操作的存在而出现劣化。

 

 

 

Partition演进

 

 

在数据湖架构演进历程中,分区策略动态调整始终是核心挑战之一。传统数据湖方案实现分区优化时,需通过全表数据重分布完成物理存储结构调整,这在 PB 级数据集场景下会产生极高的计算与存储成本。

 

Iceberg通过逻辑层-物理层解耦设计创新性解决了这一难题:其分区策略作为元数据层的逻辑抽象存在,与底层数据存储路径完全解耦。当进行分区策略调整时,历史数据保持原有物理分布不变,仅新写入数据按更新后的分区规则组织,从而实现零数据迁移的分区演进。该机制使得分区优化操作从小时级降至秒级,资源消耗几乎为零。

 

更值得关注的是Iceberg的Hidden Partitioning特性:查询层无需显式指定分区键,计算引擎通过元数据自动完成数据文件过滤。这意味着业务系统可在不影响现有查询语句的前提下,持续优化数据分布策略,实现查询逻辑与存储架构的双向解耦。

 

 

 

Upsert

 

 

Iceberg支持copy-on-write(COW)和merge-on-read(MOR)两种更新方式。COW会将变更行所属的数据文件整个重写一遍生成新的文件,即使只更新了其中一行,该方式的查询效率最高,但需要付出较大的写入成本。而MOR为高频数据更新提供了更好的写入性能。当一行数据更新时,Writer将要更新的数据特征到DeleteFile中,标记之前的数据被删除了,并且将更新的数据写入到DataFile中,通过该方式MOR将行更新的写入效率做到和追加写入保持一致。在查询时,计算引擎再将DeleteFile中的记录作为墓碑屏蔽旧的数据,完成读取时的结果合并。

 

 

 

Schema演进

 

 

应用迭代的同时,底层的数据也会跟着演进。Iceberg的Schema演进支持Add、Drop、Rename、Update和Reorder,并且与Partition演进类似,在Schema演进的时候,所有的历史DataFile都不需要被重写。

 

 

 

 

Iceberg数据入湖最佳实践

 

 

 

文件管理

 

 

避免高频Commit:Iceberg每次Commit都会生成新的Snapshot,这些Snapshot信息都会维护在MetadataFile中。高频率Commit不更仅容易触发Commit冲突,而且会造成 MetadataFile膨胀,导致存储和查询成本增加。建议控制Commit间隔在1min以上,并且由中心化的Coordinator进行提交。

 

避免生成大量小文件:每个DataFile对应一个ManifestEntry,小文件数量多会导致ManifestFile体积激增,进而导致元数据存储成本上升和查询计划生成速度下降。对象存储是按照API调用次数计费,过多的小文件也会导致查询时API的调用成本上升。建议通过数据攒批写入来减少小文件的生成,后期也可以通过Compaction来将小文件合并。阿里云OSS提供了有竞争力的PUT和GET类API价格,并每月都提供了海量免费额度,可有效降低API费用。

 

 

 

Partition

 

 

采取合适的Partition策略:

 

加速查询:将高频筛选的字段(如时间、地区)优先作为分区键,在查询时通过分区裁剪减少扫描的数据量。

 

成本:在查询效率和存储成本之间平衡。分区粒度过细会产生过多小文件,导致存储效率下降。

 

 

 

 

 

Table Topic:阿里云上实时数据入湖的最佳选择

 

 

 

概览

 

 

AutoMQ Enterprise(1.4.0版本)Table Topic在Kafka Topic的基础上,将流格式存储进一步扩展成Iceberg表格式存储。数据的生产者仍旧使用Kafka协议向AutoMQ写入数据,数据可以是数据库BinLog、ClickStream和IoT等数据。AutoMQ首先会将写进来的数据低延迟写入到流格式存储,后台经过攒批后将流格式的数据转换成Iceberg表格式的数据。至此AutoMQ通过 Iceberg将Kafka里面的流数据以表格式共享给下游的数据湖计算引擎。企业无需再去维护复杂的ETL任务,仅需要使用Kafka API向AutoMQ写入数据,AutoMQ会无感将数据入湖。数据产生即就绪,业务创新零等待。

 

 

 

 

极简Data Ingest

 

 

上游的数据源使用的是Kafka协议,而不是直接面向的的Iceberg。这么做有如下2个好处:


 

数据源生态:企业现有的Kafka生产者(如Flink CDC、Logstash、Debezium)可直接接入,节省定制化开发成本。例如MySQL的BINLOG通过Debezium写入Table Topic后,AutoMQ自动完成Avro到Iceberg Schema的映射与转换。

 

低延迟&高吞吐:数据进入AutoMQ后首先会存储到Stream Storage,AutoMQ的Stream Storage具有毫秒级延迟和GB级吞吐的特征,因此企业可以获得低延迟和高吞吐的数据入湖能力。

 

 

 

 

表自动创建&演进

 

 

AutoMQ通过深度集成Kafka Schema构建自动化数据治理闭环,从根本上解决传统入湖流程中的Schema管理顽疾。其设计利用Kafka原生的Schema注册机制作为数据质量闸门:当生产者发送数据时,Schema验证层会即时拦截不符合预定义结构的脏数据(如字段类型错误、必填字段缺失等),将数据质量问题阻拦在入湖起点。

 

当上游业务系统发生Schema变更(如MySQL源表新增「用户等级」字段),AutoMQ能够实时感知Kafka消息中的Schema版本迭代,自动完成Iceberg表结构的协同演进,同时保持数据持续写入不中断。这一过程完全无需人工介入,彻底消除了传统流程中多系统间Schema手动对齐的操作风险。

 

相较于传统架构中Flink/Spark任务与表结构的强耦合(每个同步任务需硬编码目标表Schema),AutoMQ实现了Schema管理的范式转移——将原先分散在数据管道脚本、数仓元数据库、流计算引擎等多处的Schema定义收敛为Kafka Schema单一源头。这种中心化管控模式不仅减少了的元数据维护工作量,更确保了从实时接入到湖仓存储的全链路Schema一致性。

 

 

 

数据分区

 

 

AutoMQ为了提升查询时的数据过滤效率,支持同时对多个Columns进行分区,支持year、month、day、hour、bucket和truncate分区转换函数。

 

 

Properties
# config example
#The partition fields of the table.
automq.table.topic.partition.by=[bucket(user_name),
month(create_timestamp)]

 

 

 

CDC

 

 

AutoMQ支持数据以Upsert模式进行同步,AutoMQ会根据设置的Table主键和Record指定的CDC操作来进行增删改。当AutoMQ接收到Update操作的Record时,AutoMQ会首先将主键以EqualityDelete写入到DeleteFile中,标记历史记录失效,然后再在DataFile里追加更新的记录。

 

通过AutoMQ Table Topic,企业可以将数据库的BinLog写入到AutoMQ,AutoMQ会将BinLog数据通过Upsert写入到Iceberg表。数据库服务于在线OLTP业务,Iceberg服务于OLAP数据分析,通过AutoMQ Table Topic可以保持两者之间保持数据分钟级的新鲜度。

 

 

Properties
#config example
# The primary key, comma-separated list of columns that identify a row in tables.
automq.table.topic.id.columns=[email]
# The name of the field containing the CDC operation, I, U, or D
automq.table.topic.cdc.field=ops

 

 

 

 

免任务管理

 

 

AutoMQ不像使用Spark / Flink / Connector等同步组件需要编写同步任务脚本和运维同步任务。用户仅仅需要在创建Topic时打开Table Topic开关。

 

 

Properties#
The configuration controls whether enable table topic
automq.table.topic.enable=true

 

 

AutoMQ的Topic Topic能力内置在进程中,主要模块为Coordinator和Worker:

 

Coordinator:管理Table同步进度和中心化提交。Coordinator每个Table Topic独立占有一个,绑定到Topic的分区0。Coordinator根据用户设置的提交间隔触发提交,避免了每个Worker独立提交导致的提交冲突和元数据膨胀,降低存储成本和提升查询性能。

 

Wokrer:负责将Kafka Record转换成Parquet数据文件上传到阿里云对象存储OSS。Table Topic每一个分区在同进程内都有由对应的Worker绑定负责。

 

Coordinator和Worker与分区绑定,在进程中内置具有以下好处:

 

运维简单:无需额外维护一套组件,只需要关心AutoMQ集群的生命周期,无需管理同步任务。

 

同步伸缩:AutoMQ的消息写入能力与Table Topic同步能力同步匹配伸缩。当业务高峰来临,只需要根据流量上涨比例扩容AutoMQ集群即可。

 

AutoMQ Table Topic Architecture

 

 

 

 

零跨AZ流量

 

 

在传统数仓同步架构中,采用Spark、Flink或各类Connector 工具进行数据传输时,其分区调度机制通常存在显著的云环境适配性问题。由于Worker节点或Executor资源的分配策略未与云服务商可用区(AZ)拓扑结构对齐,导致同一分区的读写操作频繁跨越不同物理区域。这种设计缺陷在AWS、GCP等按流量计费的云平台中尤为突出(阿里云不会对跨AZ流量收取费用)——据统计,跨可用区数据传输成本往往占据企业大数据基础设施总支出的80%以上。

 

针对这一行业痛点,AutoMQ提出了进程内绑定调度策略。通过将Worker节点与特定可用区的数据分区进行深度耦合,系统实现了计算资源与存储资源的拓扑感知。数据流转时Worker无需通过复杂网络路径获取数据,而是以本地方法调用的方式直接从内存缓冲区捕获实时写入的数据流,随后通过上传至阿里云OSS存储桶。这种数据传输机制可减少90%以上的跨区带宽消耗,为企业构建出兼具高性能与成本效益的云原生数据管道。

 

 

 

 

 

总结

 

 

 

本文系统解析了Apache Iceberg作为云原生数据湖核心技术的核心优势与最佳实践。Iceberg通过快照隔离实现高性能ACID事务,借助逻辑-物理解耦的分区演进机制实现零成本存储优化,并支持COW/MOR两种更新模式平衡查询与写入效率。在数据入湖实践中,需关注高频提交规避与小文件治理,结合动态分区策略提升查询性能。针对实时数据入湖挑战,AutoMQ Table Topic创新性地融合Kafka协议与Iceberg表格式,通过流批自动转换、Schema自适配及进程内绑定调度实现分钟级数据新鲜度。其免ETL任务设计显著降低运维复杂度,独有的拓扑感知机制更减少90%跨可用区流量成本,为企业构建高吞吐、低延迟、低成本的一体化数据湖方案提供了新范式。阿里云OSS的AZ间流量免费,提供有竞争力的PUT和GET类API价格,和每月的API免费额度,可有效降低云上AutoMQ方案的运行成本。





我们是阿里巴巴云计算和大数据技术幕后的核心技术输出者。

欢迎关注 “阿里云基础设施”同名微信微博知乎

获取关于我们的更多信息~

相关实践学习
借助OSS搭建在线教育视频课程分享网站
本教程介绍如何基于云服务器ECS和对象存储OSS,搭建一个在线教育视频课程分享网站。
相关文章
|
1月前
|
存储 消息中间件 运维
AutoMQ x OSS 的 Iceberg 数据入湖的最佳实践
本文将从三个维度展开论述:首先分析 Iceberg 的技术优势及其成为行业标准的原因,其次详细阐述数据入湖的最佳实践方法,最后重点介绍 AutoMQ 如何利用阿里云 OSS 高效解决 Kafka 数据入湖问题。通过 AutoMQ 和阿里云服务的结合,用户可以轻松实现 Kafka 数据入湖的最佳实践。
190 15
|
7月前
|
负载均衡 Java 对象存储
负载均衡策略:Spring Cloud与Netflix OSS的最佳实践
负载均衡策略:Spring Cloud与Netflix OSS的最佳实践
110 2
|
9月前
|
存储 JSON 自然语言处理
OSS数据源一站式RAG最佳实践
本文介绍了如何使用OpenSearch LLM智能问答版通过OSS数据源一站式构建RAG系统。
7294 11
|
11月前
|
存储 Cloud Native Serverless
云原生最佳实践系列 7:基于 OSS Object FC 实现非结构化文件实时处理
阿里云OSS对象存储方案利用函数计算FC,在不同终端请求时实时处理OSS中的原图,减少衍生图存储,降低成本。
|
11月前
|
存储 运维 监控
运维编排最佳实践:将运维编排任务执行记录投递到OSS/SLS
运维编排服务(Operation Orchestration Service),简称OOS,是全面、免费的云上自动化运维平台,提供运维任务的管理和执行。典型使用场景包括:事件驱动运维,批量操作运维,定时运维任务,跨地域运维等,OOS为重要运维场景提供审批,通知等功能。OOS帮您实现标准化运维任务,从...
运维编排最佳实践:将运维编排任务执行记录投递到OSS/SLS
|
11月前
|
存储 运维 监控
通过 SLS 实现日志大数据入湖 OSS
数据湖技术在日志生态中扮演不可或缺的角色,而打通日志从生产端到数据湖的链路却比较复杂。本文将介绍基于 SLS 方案为日志入湖提供端到端(End-to-End)支持,帮助用户提升接入效率,并在费用、运维上有效降低成本。
362 1
通过 SLS 实现日志大数据入湖 OSS
|
11月前
|
存储 分布式计算 监控
操作审计最佳实践:将阿里云操作日志持续投递到您的 SLS/OSS
操作审计(ActionTrail)帮助您监控并记录阿里云账号的活动,包括通过阿里云控制台、OpenAPI、开发者工具对云上产品和服务的访问和使用行为,记录为操作日志。 操作审计支持所有阿里云账号的免开通服务,默认为所有账号记录并存储近 90 天的日志。但在实际应用中,受法律法规和企业审计标准的要求,...
686 0
|
2月前
|
SQL 分布式计算 Serverless
基于阿里云 EMR Serverless Spark 版快速搭建OSS日志分析应用
基于阿里云 EMR Serverless Spark 版快速搭建OSS日志分析应用
|
9月前
|
机器学习/深度学习 人工智能 专有云
人工智能平台PAI使用问题之怎么将DLC的数据写入到另一个阿里云主账号的OSS中
阿里云人工智能平台PAI是一个功能强大、易于使用的AI开发平台,旨在降低AI开发门槛,加速创新,助力企业和开发者高效构建、部署和管理人工智能应用。其中包含了一系列相互协同的产品与服务,共同构成一个完整的人工智能开发与应用生态系统。以下是对PAI产品使用合集的概述,涵盖数据处理、模型开发、训练加速、模型部署及管理等多个环节。
|
5月前
|
分布式计算 Java 开发工具
阿里云MaxCompute-XGBoost on Spark 极限梯度提升算法的分布式训练与模型持久化oss的实现与代码浅析
本文介绍了XGBoost在MaxCompute+OSS架构下模型持久化遇到的问题及其解决方案。首先简要介绍了XGBoost的特点和应用场景,随后详细描述了客户在将XGBoost on Spark任务从HDFS迁移到OSS时遇到的异常情况。通过分析异常堆栈和源代码,发现使用的`nativeBooster.saveModel`方法不支持OSS路径,而使用`write.overwrite().save`方法则能成功保存模型。最后提供了完整的Scala代码示例、Maven配置和提交命令,帮助用户顺利迁移模型存储路径。