Flink + Iceberg 在去哪儿的实时数仓实践

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: 本文介绍去哪儿数据平台在使用 Flink + Iceberg 0.11 的一些实践。

作者:余东

摘要: 本文介绍去哪儿数据平台在使用 Flink + Iceberg 0.11 的一些实践。内容包括:

  • 背景及痛点
  • Iceberg 架构
  • 痛点一:Kafka 数据丢失
  • 痛点二:近实时 Hive 压力大
  • Iceberg 优化实践
  • 总结

GitHub 地址
https://github.com/apache/flink
欢迎大家给 Flink 点赞送 star~

一、背景及痛点

1. 背景

我们在使用 Flink 做实时数仓以及数据传输过程中,遇到了一些问题:比如 Kafka 数据丢失,Flink 结合 Hive 的近实时数仓性能等。Iceberg 0.11 的新特性解决了这些业务场景碰到的问题。对比 Kafka 来说,Iceberg 在某些特定场景有自己的优势,在此我们做了一些基于 Iceberg 的实践分享。

2. 原架构方案

原先的架构采用 Kafka 存储实时数据,其中包括日志、订单、车票等数据。然后用 Flink SQL 或者 Flink datastream 消费数据进行流转。内部自研了提交 SQL 和 Datastream 的平台,通过该平台提交实时作业。

3. 痛点

  • Kafka 存储成本高且数据量大。Kafka 由于压力大将数据过期时间设置的比较短,当数据产生反压,积压等情况时,如果在一定的时间内没消费数据导致数据过期,会造成数据丢失。
  • Flink 在 Hive 上做了近实时的读写支持。为了分担 Kafka 压力,将一些实时性不太高的数据放入 Hive,让 Hive 做分钟级的分区。但是随着元数据不断增加,Hive metadata 的压力日益显著,查询也变得更慢,且存储 Hive 元数据的数据库压力也变大。

二、Iceberg 架构

1. Iceberg 架构解析

img

术语解析

  • 数据文件(data files)

    Iceberg 表真实存储数据的文件,一般存储在 data 目录下,以 “.parquet” 结尾。

  • 清单文件(Manifest file)

    每行都是每个数据文件的详细描述,包括数据文件的状态、文件路径、分区信息、列级别的统计信息(比如每列的最大最小值、空值数等)。通过该文件,可过滤掉无关数据,提高检索速度。

  • 快照(Snapshot)

    快照代表一张表在某个时刻的状态。每个快照版本包含某个时刻的所有数据文件列表。Data files 存储在不同的 manifest files 里面, manifest files 存储在一个 Manifest list 文件里面,而一个 Manifest list 文件代表一个快照。

2. Iceberg 查询计划

查询计划是在表中查找 “查询所需文件” 的过程。

  • 元数据过滤

    清单文件包括分区数据元组和每个数据文件的列级统计信息。在计划期间,查询谓词会自动转换为分区数据上的谓词,并首先应用于过滤数据文件。接下来,使用列级值计数,空计数,下限和上限来消除与查询谓词不匹配的文件。

  • Snapshot ID

    每个 Snapshot ID 会关联到一组 manifest files,而每一组 manifest files 包含很多 manifest file。

  • manifest files 文件列表

    每个 manifest files 又记录了当前 data 数据块的元数据信息,其中就包含了文件列的最大值和最小值,然后根据这个元数据信息,索引到具体的文件块,从而更快的查询到数据。

三、痛点一:Kafka 数据丢失

1. 痛点介绍

通常我们会选择 Kafka 做实时数仓,以及日志传输。Kafka 本身存储成本很高,且数据保留时间有时效性,一旦消费积压,数据达到过期时间后,就会造成数据丢失且没有消费到。

2. 解决方案

将实时要求不高的业务数据入湖、比如说能接受 1-10 分钟的延迟。因为 Iceberg 0.11 也支持 SQL 实时读取,而且还能保存历史数据。这样既可以减轻线上 Kafka 的压力,还能确保数据不丢失的同时也能实时读取。

3 .为什么 Iceberg 只能做近实时入湖?

img

  1. Iceberg 提交 Transaction 时是以文件粒度来提交。这就没法以秒为单位提交 Transaction,否则会造成文件数量膨胀;
  2. 没有在线服务节点。对于实时的高吞吐低延迟写入,无法得到纯实时的响应;
  3. Flink 写入以 checkpoint 为单位,物理数据写入 Iceberg 后并不能直接查询,当触发了 checkpoint 才会写 metadata 文件,这时数据由不可见变为可见。checkpoint 每次执行都会有一定时间。

4. Flink 入湖分析

img

组件介绍

  • IcebergStreamWriter

    主要用来写入记录到对应的 avro、parquet、orc 文件,生成一个对应的 Iceberg DataFile,并发送给下游算子。

另外一个叫做 IcebergFilesCommitter,主要用来在 checkpoint 到来时把所有的 DataFile 文件收集起来,并提交 Transaction 到 Apache Iceberg,完成本次 checkpoint 的数据写入,生成 DataFile。

  • IcebergFilesCommitter

    为每个 checkpointId 维护了一个 DataFile 文件列表,即 map<Long, List>,这样即使中间有某个 checkpoint 的 transaction 提交失败了,它的 DataFile 文件仍然维护在 State 中,依然可以通过后续的 checkpoint 来提交数据到 Iceberg 表中。

5. Flink SQL Demo

Flink Iceberg 实时入湖流程,消费 Kafka 数据写入 Iceberg,并从 Iceberg 近实时读取数据。

img

5.1 前期工作

  • 开启实时读写功能

    set execution.type = streaming

  • 开启 table sql hint 功能来使用 OPTIONS 属性

    set table.dynamic-table-options.enabled=true

  • 注册 Iceberg catalog 用于操作 Iceberg 表

    CREATE CATALOG Iceberg_catalog WITH (\n" +
                "  'type'='Iceberg',\n" +
                "  'catalog-type'='Hive'," +
                "  'uri'='thrift://localhost:9083'" +
                ");
  • Kafka 实时数据入湖

    insert into Iceberg_catalog.Iceberg_db.tbl1 \n 
                select * from Kafka_tbl;
  • 数据湖之间实时流转 tbl1 -> tbl2

      insert into Iceberg_catalog.Iceberg_db.tbl2  
        select * from Iceberg_catalog.Iceberg_db.tbl1 
        /*+ OPTIONS('streaming'='true', 
    'monitor-interval'='10s',snapshot-id'='3821550127947089987')*/

5.2 参数解释

  • monitor-interval

    连续监视新提交的数据文件的时间间隔(默认值:1s)。

  • start-snapshot-id

    从指定的快照 ID 开始读取数据、每个快照 ID 关联的是一组 manifest file 元数据文件,每个元数据文件映射着自己的真实数据文件,通过快照 ID,从而读取到某个版本的数据。

6. 踩坑记录

我之前在 SQL Client 写数据到 Iceberg,data 目录数据一直在更新,但是 metadata 没有数据,导致查询的时候没有数,因为 Iceberg 的查询是需要元数据来索引真实数据的。SQL Client 默认没有开启 checkpoint,需要通过配置文件来开启状态。所以会导致 data 目录写入数据而 metadata 目录不写入元数据。

PS:无论通过 SQL 还是 Datastream 入湖,都必须开启 Checkpoint。

7. 数据样例

下面两张图展示的是实时查询 Iceberg 的效果,一秒前和一秒后的数据变化情况。

  • 一秒前的数据

img

  • 一秒后刷新的数据

img

四、痛点二:Flink 结合 Hive 的近实时越来越慢

1. 痛点介绍

选用 Flink + Hive 的近实时架构虽然支持了实时读写,但是这种架构带来的问题是随着表和分区增多,将会面临以下问题:

  • 元数据过多

    Hive 将分区改为小时 / 分钟级,虽然提高了数据的准实时性,但是 metestore 的压力也是显而易见的,元数据过多导致生成查询计划变慢,而且还会影响线上其他业务稳定。

  • 数据库压力变大

    随着元数据增加,存储 Hive 元数据的数据库压力也会增加,一段时间后,还需要对该库进行扩容,比如存储空间。

img

img

2. 解决方案

将原先的 Hive 近实时迁移到 Iceberg。为什么 Iceberg 可以处理元数据量大的问题,而 Hive 在元数据大的时候却容易形成瓶颈?

  • Iceberg 是把 metadata 维护在可拓展的分布式文件系统上,不存在中心化的元数据系统;
  • Hive 则是把 partition 之上的元数据维护在 metastore 里面(partition 过多则给 mysql 造成巨大压力),而 partition 内的元数据其实是维护在文件内的(启动作业需要列举大量文件才能确定文件是否需要被扫描,整个过程非常耗时)。

img

五、优化实践

1. 小文件处理

  • Iceberg 0.11 以前,通过定时触发 batch api 进行小文件合并,这样虽然能合并,但是需要维护一套 Actions 代码,而且也不是实时合并的。

    Table table = findTable(options, conf);
    Actions.forTable(table).rewriteDataFiles()
            .targetSizeInBytes(10 * 1024) // 10KB
            .execute();
  • Iceberg 0.11 新特性,支持了流式小文件合并。

    通过分区/存储桶键使用哈希混洗方式写数据、从源头直接合并文件,这样的好处在于,一个 task 会处理某个分区的数据,提交自己的 Datafile 文件,比如一个 task 只处理对应分区的数据。这样避免了多个 task 处理提交很多小文件的问题,且不需要额外的维护代码,只需在建表的时候指定属性 write.distribution-mode,该参数与其它引擎是通用的,比如 Spark 等。

    CREATE TABLE city_table ( 
         province BIGINT,
         city STRING
      ) PARTITIONED BY (province, city) WITH (
        'write.distribution-mode'='hash' 
      );

2. Iceberg 0.11 排序

2.1 排序介绍

在 Iceberg 0.11 之前,Flink 是不支持 Iceberg 排序功能的,所以之前只能结合 Spark 以批模式来支持排序功能,0.11 新增了排序特性的支持,也意味着,我们在实时也可以体会到这个好处。

排序的本质是为了扫描更快,因为按照 sort key 做了聚合之后,所有的数据都按照从小到大排列,max-min 可以过滤掉大量的无效数据。

img

2.2 排序 demo

insert into Iceberg_table select days from Kafka_tbl order by days, province_id;

3. Iceberg 排序后 manifest 详解

img

参数解释

  • file_path:物理文件位置。
  • partition:文件所对应的分区。
  • lower_bounds:该文件中,多个排序字段的最小值,下图是我的 days 和 province_id 最小值。
  • upper_bounds:该文件中,多个排序字段的最大值,下图是我的 days 和 province_id 最大值。

通过分区、列的上下限信息来确定是否读取 file_path 的文件,数据排序后,文件列的信息也会记录在元数据中,查询计划从 manifest 去定位文件,不需要把信息记录在 Hive metadata,从而减轻 Hive metadata 压力,提升查询效率。

利用 Iceberg 0.11 的排序特性,将天作为分区。按天、小时、分钟进行排序,那么 manifest 文件就会记录这个排序规则,从而在检索数据的时候,提高查询效率,既能实现 Hive 分区的检索优点,还能避免 Hive metadata 元数据过多带来的压力。

六、总结

相较于之前的版本来说,Iceberg 0.11 新增了许多实用的功能,对比了之前使用的旧版本,做以下总结:

  • Flink + Iceberg 排序功能

    在 Iceberg 0.11 以前,排序功能集成了 Spark,但没有集成 Flink,当时用 Spark + Iceberg 0.10 批量迁移了一批 Hive 表。在 BI 上的收益是: 原先 BI 为了提升 Hive 查询速度建了多级分区,导致小文件和元数据过多,入湖过程中,利用 Spark 排序 BI 经常查询的条件,结合隐式分区,最终提升 BI 检索速度的同时,也没有小文件的问题,Iceberg 有自身的元数据,也减少了 Hive metadata 的压力。

    Icebeg 0.11 支持了 Flink 的排序,是一个很实用的功能点。我们可以把原先 Flink + Hive 的分区转移到 Iceberg 排序中,既能达到 Hive 分区的效果,也能减少小文件和提升查询效率。

  • 实时读取数据

    通过 SQL 的编程方式,即可实现数据的实时读取。好处在于,可以把实时性要求不高的,比如业务可以接受 1-10 分钟延迟的数据放入 Iceberg 中 ,在减少 Kafka 压力的同时,也能实现数据的近实时读取,还能保存历史数据。

  • 实时合并小文件

    在Iceberg 0.11以前,需要用 Iceberg 的合并 API 来维护小文件合并,该 API 需要传入表信息,以及定时信息,且合并是按批次这样进行的,不是实时的。从代码上来说,增加了维护和开发成本;从时效性来说,不是实时的。0.11 用 Hash 的方式,从源头对数据进行实时合并,只需在 SQL 建表时指定 ('write.distribution-mode'='hash') 属性即可,不需要手工维护。


更多 Flink 相关技术交流,可扫码加入社区钉钉大群~

image.png

活动推荐

阿里云基于 Apache Flink 构建的企业级产品-实时计算Flink版现开启活动:
99元试用实时计算Flink版(包年包月、10CU)即有机会获得 Flink 独家定制T恤;另包3个月及以上还有85折优惠!
了解活动详情:https://www.aliyun.com/product/bigdata/sc

image.png

相关文章
|
5月前
|
SQL 分布式计算 DataWorks
破界·融合·进化:解码DataWorks与Hologres的湖仓一体实践
基于阿里云DataWorks与实时数仓Hologres,提供统一的大数据开发治理平台与全链路实时分析能力。DataWorks支持多行业数据集成与管理,Hologres实现海量数据的实时写入与高性能查询分析,二者深度融合,助力企业构建高效、实时的数据驱动决策体系,加速数字化升级。
|
7月前
|
存储 消息中间件 OLAP
基于 Flink+Paimon+Hologres 搭建淘天集团湖仓一体数据链路
本文整理自淘天集团高级数据开发工程师朱奥在Flink Forward Asia 2024的分享,围绕实时数仓优化展开。内容涵盖项目背景、核心策略、解决方案、项目价值及未来计划五部分。通过引入Paimon和Hologres技术,解决当前流批存储不统一、实时数据可见性差等痛点,实现流批一体存储与高效近实时数据加工。项目显著提升了数据时效性和开发运维效率,降低了使用门槛与成本,并规划未来在集团内推广湖仓一体架构,探索更多技术创新场景。
1422 3
基于 Flink+Paimon+Hologres 搭建淘天集团湖仓一体数据链路
|
8月前
|
消息中间件 存储 监控
Lalamove基于Flink实时湖仓演进之路
本文由货拉拉国际化技术部资深数据仓库工程师林海亮撰写,围绕Flink在实时数仓中的应用展开。文章首先介绍了Lalamove业务背景,随后分析了Flink在实时看板、数据服务API、数据监控及数据分析中的应用与挑战,如多数据中心、时区差异、上游改造频繁及高成本问题。接着阐述了实时数仓架构从无分层到引入Paimon湖仓的演进过程,解决了数据延迟、兼容性及资源消耗等问题。最后展望未来,提出基于Fluss+Paimon优化架构的方向,进一步提升性能与降低成本。
326 11
Lalamove基于Flink实时湖仓演进之路
|
8月前
|
存储 监控 数据挖掘
京东物流基于Flink & StarRocks的湖仓建设实践
本文整理自京东物流高级数据开发工程师梁宝彬在Flink Forward Asia 2024的分享,聚焦实时湖仓的探索与建设、应用实践、问题思考及未来展望。内容涵盖京东物流通过Flink和Paimon等技术构建实时湖仓体系的过程,解决复杂业务场景下的数据分析挑战,如多维OLAP分析、大屏监控等。同时,文章详细介绍了基于StarRocks的湖仓一体方案,优化存储成本并提升查询效率,以及存算分离的应用实践。最后,对未来数据服务的发展方向进行了展望,计划推广长周期数据存储服务和原生数据湖建设,进一步提升数据分析能力。
752 1
京东物流基于Flink & StarRocks的湖仓建设实践
|
8月前
|
存储 SQL 运维
中国联通网络资源湖仓一体应用实践
本文分享了中国联通技术专家李晓昱在Flink Forward Asia 2024上的演讲,介绍如何借助Flink+Paimon湖仓一体架构解决传统数仓处理百亿级数据的瓶颈。内容涵盖网络资源中心概况、现有挑战、新架构设计及实施效果。新方案实现了数据一致性100%,同步延迟从3小时降至3分钟,存储成本降低50%,为通信行业提供了高效的数据管理范例。未来将深化流式数仓与智能运维融合,推动数字化升级。
343 0
中国联通网络资源湖仓一体应用实践
|
3月前
|
存储 JSON 数据处理
Flink基于Paimon的实时湖仓解决方案的演进
本文源自Apache CommunityOverCode Asia 2025,阿里云专家苏轩楠分享Flink与Paimon构建实时湖仓的演进实践。深度解析Variant数据类型、Lookup Join优化等关键技术,提升半结构化数据处理效率与系统可扩展性,推动实时湖仓在生产环境的高效落地。
382 0
Flink基于Paimon的实时湖仓解决方案的演进
|
4月前
|
SQL 存储 运维
Apache Doris 在菜鸟的大规模湖仓业务场景落地实践
本文介绍了 Apache Doris 在菜鸟的大规模落地的实践经验,菜鸟为什么选择 Doris,以及 Doris 如何在菜鸟从 0 开始,一步步的验证、落地,到如今上万核的规模,服务于各个业务线,Doris 已然成为菜鸟 OLAP 数据分析的最优选型。
296 2
Apache Doris 在菜鸟的大规模湖仓业务场景落地实践
|
8月前
|
存储 消息中间件 分布式计算
Hologres实时数仓在B站游戏的建设与实践
本文介绍了B站游戏业务中实时数据仓库的构建与优化过程。为满足日益增长的数据实时性需求,采用了Hologres作为核心组件优化传统Lambda架构,实现了存储层面的流批一体化及离线-实时数据的无缝衔接。文章详细描述了架构选型、分层设计(ODS、DWD、DIM、ADS)及关键技术挑战的解决方法,如高QPS点查、数据乱序重写等。目前,该实时数仓已广泛应用于运营分析、广告投放等多个场景,并计划进一步完善实时指标体系、扩展明细层应用及研发数据实时解析能力。
Hologres实时数仓在B站游戏的建设与实践
|
3月前
|
存储 人工智能 监控
淘宝闪购基于Flink&Paimon的Lakehouse生产实践:从实时数仓到湖仓一体化的演进之路
本文整理自淘宝闪购(饿了么)大数据架构师王沛斌在 Flink Forward Asia 2025 上海站的分享,深度解析其基于 Apache Flink 与 Paimon 的 Lakehouse 架构演进与落地实践,涵盖实时数仓发展、技术选型、平台建设及未来展望。
774 0
淘宝闪购基于Flink&Paimon的Lakehouse生产实践:从实时数仓到湖仓一体化的演进之路
|
9月前
|
存储 分布式计算 MaxCompute
Hologres实时湖仓能力入门实践
本文由武润雪(栩染)撰写,介绍Hologres 3.0版本作为一体化实时湖仓平台的升级特性。其核心能力包括湖仓存储一体、多模式计算一体、分析服务一体及Data+AI一体,极大提升数据开发效率。文章详细解析了两种湖仓架构:MaxCompute + Hologres实现离线实时一体化,以及Hologres + DLF + OSS构建开放湖仓架构,并深入探讨元数据抽象、权限互通等重点功能,同时提供具体使用说明与Demo演示。

相关产品

  • 实时计算 Flink版