字节跳动基于Apache Hudi构建EB级数据湖实践

简介: 字节跳动基于Apache Hudi构建EB级数据湖实践

接下来将分为场景需求、设计选型、功能支持、性能调优、未来展望五部分介绍Hudi在字节跳动推荐系统中的实践。

在推荐系统中,我们在两个场景下使用数据湖

1.我们使用BigTable作为整个系统近线处理的数据存储,这是一个公司自研的组件TBase,提供了BigTable的语义和搜索推荐广告场景下一些需求的抽象,并屏蔽底层存储的差异。为了更好的理解,这里可以把它直接看做一个HBase。在这过程中为了能够服务离线对数据的分析挖掘需求,需要将数据导出到离线存储中。在过去用户或是使用MR/Spark直接访问存储,或是通过扫库的方式获取数据,不符合OLAP场景下的数据访问特性。因此我们基于数据湖构建BigTable的CDC,提高数据时效,减少近线系统访问压力,提供高效的OLAP访问和用户友好的SQL消费方式。2.除此之外,我们还在特征工程与模型训练的场景中使用数据湖。我们从内部和外部分别获得两类实时数据流,一个是来自系统内部回流的Instance,包含了推荐系统Serving时获得的Feature。另一种是来自端上埋点/多种复杂外部数据源的反馈,这类数据作为Label,和之前的feature共同组成了完整的机器学习样本。针对这个场景,我们需要实现一个基于主键的拼接操作,将Instance和Label Merge到一起。开窗范围可能长达数十天,千亿行量级。需要支持高效的列式选取和谓词下推。同时还需要支持并发Update等相关能力。

在这两个场景下存在如下挑战

1.数据的非常不规整。相比Binlog,WAL没法获得一行的全部信息,同时数据大小变化非常大。2.吞吐量比较大,单表吞吐超百GB/s,单表PB级存储。3.数据Schema 复杂。数据存在高维、稀疏等现象。表列数从1000-10000+都有。并且有大量复杂数据类型。

在引擎选型时,我们考察过Hudi,Iceberg,DeltaLake三个最热门的数据湖引擎。三者在我们的场景下各有优劣,最终基于Hudi对上下游生态的开放,对全局索引的支持,对若干存储逻辑提供了定制化的开发接口等原因,选择了Hudi作为存储引擎。

针对实时写入,选择了时效性更好的MOR。考察了索引类型,首先因为WAL不能每次都获取到数据的分区,所以必须要全局索引。在几种全局索引实现中,为了实现高性能的写入,HBase是唯一的选择。另外两种的实现决定了都和HBase在性能有本质上的差距。在计算引擎上和API上,当时Hudi对Flink的支持还不是特别完善,所以选择了更为成熟的Spark,为了能灵活实现一些定制功能和逻辑,也因为DataFrame的API语义限制比较多,所以选择了更底层的RDD API。

功能支持包括存储语义的MVCC和Schema注册系统。

首先为了支持WAL语义的写入,我们实现了针对MVCC的Payload,基于Avro自定义了一套带时间戳的数据结构实现。并通过视图访问的方式对用户屏蔽了这套逻辑。除此之外还实现了HBase Append的语义,可以实现对List类型的追加写而非覆盖写。

由于Hudi本身的Schema从Write的数据中获取,这种方式和其他系统对接不是很方便,以及我们需要一些基于Schema的扩展功能,所以我们构建了一个元数据中心来提供元数据相关的操作。

首先我们基于一种内部的存储提供的语义实现了原子变更和异地多活。用户可以通过接口原子地触发Schema变更并立刻获得结果。并通过加入版本号的方法实现了Schema的多版本,有了版本号之后可以方便的使用Schema而不是把Json传来传去。有了多版本也可以实现Schema更灵活的演进。我们还支持了列级别的额外信息编码,来帮助业务实现一些场景下特有的扩展功能。并把列名替换成了数字来节约使用过程中的开销。Hudi的Spark Job在使用的时候会在JVM级别构建一个local cache并通过pull的方式和元数据中心同步数据,实现Schema的快速访问和进程内Schema的单例。

在我们场景下性能挑战比较大,最大单表数据量达400PB+,日增PB级数据量,总数据量达EB级别,因此我们针对性能和数据特性开发做了一些工作来提高性能。

序列化方面包括如下优化

1.Schema:数据使用Avro序列化开销特别大,而且消耗资源也非常多。针对这个问题,我们首先借助Schema的JVM单例,规避了序列化过程中很多费CPU的比较操作。2.通过优化Payload逻辑,减少了需要序列化的次数。3.借助了第三方的Avro序列化实现,通过将序列化过程编译成字节码的方式来提高SerDe的速度以及降低内存占用。对这种序列化形式做了修改,以保证我们的复杂Schema也能够正常编译。

对于Compaction流程优化如下

Hudi除了默认的Inline/Async compaction选项之外,还支持Compaction的灵活部署。Compaction Job的作业特性和Ingestion作业其实有较大区别。在同一个Spark Application当中不仅不能针对性设置,也存在资源弹性不足的问题。我们首先构建了独立部署的脚本,让Compaction作业可以独立触发运行。使用了低成本的混部队列并可以针对此次Compaction的Plan做资源申请。除此之外还做了基于规则和启发式的Compaction Strategy,用户的需求通常是保证天级别或者小时级别的SLA,并针对性地压缩某些分区的数据,所以提供了针对性压缩的能力。为了能缩短关键Compaction的时间,我们通常会提前做Compaction来避免所有工作都在一个Compaction Job中完成。但是如果一个Compact过的FileGroup又有新的Update,就不得不再次Compact。为了优化整体的效率,我们根据业务信息对一个FileGroup该在何时被压缩做了启发式的调度以减少额外的压缩损耗。该特性的具体收益还在评估中。最后我们对Compaction做了一些流程的优化,比如不使用WriteStatus的Cache等等。

HDFS作为一种面向吞吐设计的存储,在集群水位比较高的情况下,实时写入毛刺比较严重。通过和HDFS团队的沟通与合作,做了相关的一些工作。

首先把原有的数据HSync操作替换为HFlush,避免了分散性update导致的磁盘IO写放大。针对场景调参做了激进的pipeline切换设置,并且HDFS团队开发了灵活的可以控制pipeline的api,来实现这个场景下灵活的配置。最后还通过logfile独立IO隔离的方式保证了实时写入的时效性。

还有一些零零碎碎的性能提升,流程修改和Bug Fix,大家感兴趣可以找我交流。

未来我们会在以下几个方面持续迭代。

产品化问题:目前使用的API和调参调优方式都对用户要求很高,尤其是调参和运维,需要对Hudi原理有相当的了解才可以完成,不利于用户推广使用。生态对接问题:在我们的场景中,技术栈以Flink为主,未来会探索Flink的使用。除此之外上下游使用的应用和环境也比较复杂,非常需要跨语言和通用的接口实现。目前和Spark绑定过于严重。成本和性能问题:老生常谈的话题,由于我们场景比较大,所以在这块优化上的收益非常可观。存储语义:我们把Hudi当做一种存储来使用而非一种表格式。所以未来会拓展Hudi的使用场景,需要更丰富的存储语义,会在这方面做更多的工作。

目录
打赏
0
2
2
0
39
分享
相关文章
天翼云:Apache Doris + Iceberg 超大规模湖仓一体实践
天翼云基于 Apache Doris 成功落地项目已超 20 个,整体集群规模超 50 套,部署节点超 3000 个,存储容量超 15PB
天翼云:Apache Doris + Iceberg 超大规模湖仓一体实践
小米基于 Apache Paimon 的流式湖仓实践
本文整理自Flink Forward Asia 2024流式湖仓专场分享,由计算平台软件研发工程师钟宇江主讲。内容涵盖三部分:1)背景介绍,分析当前实时湖仓架构(如Flink + Talos + Iceberg)的痛点,包括高成本、复杂性和存储冗余;2)基于Paimon构建近实时数据湖仓,介绍其LSM存储结构及应用场景,如Partial-Update和Streaming Upsert,显著降低计算和存储成本,简化架构;3)未来展望,探讨Paimon在流计算中的进一步应用及自动化维护服务的建设。
小米基于 Apache Paimon 的流式湖仓实践
金融场景 PB 级大规模日志平台:中信银行信用卡中心从 Elasticsearch 到 Apache Doris 的先进实践
中信银行信用卡中心每日新增日志数据 140 亿条(80TB),全量归档日志量超 40PB,早期基于 Elasticsearch 构建的日志云平台,面临存储成本高、实时写入性能差、文本检索慢以及日志分析能力不足等问题。因此使用 Apache Doris 替换 Elasticsearch,实现资源投入降低 50%、查询速度提升 2~4 倍,同时显著提高了运维效率。
金融场景 PB 级大规模日志平台:中信银行信用卡中心从 Elasticsearch 到 Apache Doris 的先进实践
Cisco WebEx 数据平台:统一 Trino、Pinot、Iceberg 及 Kyuubi,探索 Apache Doris 在 Cisco 的改造实践
Cisco WebEx 早期数据平台采用了多系统架构(包括 Trino、Pinot、Iceberg 、 Kyuubi 等),面临架构复杂、数据冗余存储、运维困难、资源利用率低、数据时效性差等问题。因此,引入 Apache Doris 替换了 Trino、Pinot 、 Iceberg 及 Kyuubi 技术栈,依赖于 Doris 的实时数据湖能力及高性能 OLAP 分析能力,统一数据湖仓及查询分析引擎,显著提升了查询性能及系统稳定性,同时实现资源成本降低 30%。
Cisco WebEx 数据平台:统一 Trino、Pinot、Iceberg 及 Kyuubi,探索 Apache Doris 在 Cisco 的改造实践
The Past, Present and Future of Apache Flink
本文整理自阿里云开源大数据负责人王峰(莫问)在 Flink Forward Asia 2024 上海站主论坛开场的分享,今年正值 Flink 开源项目诞生的第 10 周年,借此时机,王峰回顾了 Flink 在过去 10 年的发展历程以及 Flink社区当前最新的技术成果,最后展望下一个十年 Flink 路向何方。
463 33
The Past, Present and Future of Apache Flink
Apache Flink 2.0.0: 实时数据处理的新纪元
Apache Flink 2.0.0 正式发布!这是自 Flink 1.0 发布九年以来的首次重大更新,凝聚了社区两年的努力。此版本引入分离式状态管理、物化表、流批统一等创新功能,优化云原生环境下的资源利用与性能表现,并强化了对人工智能工作流的支持。同时,Flink 2.0 对 API 和配置进行了全面清理,移除了过时组件,为未来的发展奠定了坚实基础。感谢 165 位贡献者的辛勤付出,共同推动实时计算进入新纪元!
155 1
Apache Flink 2.0.0: 实时数据处理的新纪元
|
6月前
|
Apache Flink 2.0-preview released
Apache Flink 社区正积极筹备 Flink 2.0 的发布,这是自 Flink 1.0 发布以来的首个重大更新。Flink 2.0 将引入多项激动人心的功能和改进,包括存算分离状态管理、物化表、批作业自适应执行等,同时也包含了一些不兼容的变更。目前提供的预览版旨在让用户提前尝试新功能并收集反馈,但不建议在生产环境中使用。
1197 13
Apache Flink 2.0-preview released
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
207 3
Apache Doris Flink Connector 24.0.0 版本正式发布
该版本新增了对 Flink 1.20 的支持,并支持通过 Arrow Flight SQL 高速读取 Doris 中数据。

推荐镜像

更多