Flink OLAP 在字节跳动的查询优化和落地实践

简介: 字节跳动基础架构工程师何润康在 Flink Forward Asia 2022 核心技术专场的分享。

本文整理自字节跳动基础架构工程师何润康在 Flink Forward Asia 2022 核心技术专场的分享。Flink OLAP 是数据仓库系统的重要应用,支持复杂的分析型查询,广泛应用于数据分析、商业决策等场景。本次分享将围绕字节 Flink OLAP 整体介绍、查询优化、集群运维和稳定性建设、收益以及未来规划五个方面展开介绍。

点击查看原文视频 & 演讲PPT

一、字节 Flink OLAP 介绍

1

字节 Flink OLAP 上线以来接入了包括 User Growth、飞书、电商和幸福里等 12 家以上核心业务方,集群规模达到 1.6 万 Core 以上,每天的查询规模超过 50w 次,单集群支持了复杂查询高峰期的 200 QPS,同时 Query Latency P99 控制在 5s 以内,较好的满足了业务的性能需求。

2

Flink OLAP 的总体架构分为 Flink SQL Gateway 和 Flink Session Cluster 两部分。

首先,用户通过 Client 提交一个 Query,先经过 Gateway 的 SQL 解析和优化过程,生成作业的执行计划,再提交给 Flink Session Cluster 的 JobManager,JobManager 的 Dispatcher 组件会创建一个对应的 JobMaster,并根据特定的调度规则将 Task 部署到对应的 TaskManager 上执行,最后将执行的结果返回给 Client。

3

Flink OLAP 是作为内部自研的高性能 HTAP 产品 -- ByteHTAP 的 AP 引擎,用于支持内部的核心业务。通过支持双机房部署提高容灾能力,每个新接入的业务可以在双机房垂直部署两套 AP 集群,在线上集群出现严重故障时,可以通过 Proxy 快速切流到另一个集群,从而提高服务的可用性。

4

Flink 在流式场景的应用已经十分成熟,在批式场景的应用也在逐步扩大,但是在 OLAP 场景下的打磨和使用则较少。字节 Flink OLAP 在真实的业务落地过程中遇到了很多问题和挑战,主要分为对性能和运维稳定性的挑战。

在性能方面的一大挑战是 OLAP 业务要求亚秒级的作业 Latency,这和流批有很大的不同,流式和批式主要关注数据的处理速度,而不需要关注 Plan 构建、Task 初始化等阶段的耗时。但是在 OLAP 场景下,优化这些阶段的耗时就变得非常重要。另外,字节 Flink OLAP 基于存算分离架构,有更加强烈的算子下推需求。

另一个挑战是,OLAP 业务要求较高的 QPS,所以当 OLAP 集群频繁地创建和执行作业,某些情况下会导致集群出现严重的性能问题,但是在流式和批式下只需要执行一次通常不会出现问题。因此,针对以上不同,在 OLAP 场景下进行了很多查询相关的优化,比如 Plan 的构建加速和初始化等相关优化。

5

在业务的落地过程中,OLAP 和流批场景有很大的不同,运维、监控和稳定性都需要针对 OLAP 场景单独构建。

在运维方面,OLAP 是在线服务,对可用性的要求很高,所以完善测试流程和测试场景是非常必要的,可以减少线上 Bug 的概率。另外在运维升级时,不同于流批作业的直接重启升级,OLAP 集群的运维升级因为不能中断用户使用,所以如何做到无感知升级是一个挑战。

在监控方面,为了保障在线服务的可用性,线上集群出现问题后,需要及时进行故障恢复和定位。因此针对 OLAP 下的监控体系就尤为重要。除了流批的集群状态监控外,OLAP 场景下特有的慢查询分析和监控,是需要额外构建的。

在稳定性方面,第一个挑战是建设 OLAP 容灾能力。流批和 OLAP 的故障恢复策略不同,流式作业通过 Failover 来恢复,批式作业通过作业重跑或 Failover 来恢复。在 OLAP 下,多个作业同时运行在一个在线集群上,单个作业失败可以重试,但是整个集群出现无法恢复的故障时,如果采用重启恢复,分钟级别的耗时对于线上服务是无法接受的。第二个挑战是 Full GC 的治理,流批作业对 Full GC 的容忍度相对较高,但是 OLAP 下业务对 Latency 非常敏感,而且 Full GC 还会导致同时运行的其它作业变慢,严重影响用户体验。

二、查询优化

6

在 OLAP 场景下,Query 有两个典型的特点:业务上重复的 Query 和亚秒级的查询耗时。通过分析发现,Plan 阶段的耗时为几十到几百毫秒,占比较高。因此支持了 Plan 缓存,避免相同 Query 的重复 Plan;此外也支持了 Catalog Cache,加速元信息的访问,还支持 ExecNode 的并行 Translate,使 TPC-DS Plan 的耗时降低了 10% 左右。

7

在存算分离架构下,算子下推是一类非常重要的优化。核心思路是尽可能的将一些算子下推到存储层进行计算,大幅减少 Scan 的数据量,降低外部的 IO,同时也能够减少 Flink 引擎需要处理的数据量,从而明显提升 Query 的性能。

TopN 下推:在字节内部的一个典型业务上,大部分 Query 都是取 TopN 的数据。通过支持 TopN 的下推优化,把 Local SortLimit 算子,也就是 Local 的 TopN 算子,下推到了 Scan 节点,最终在存储层做 TopN 计算,从而大幅降低从存储读取的数据量。经过优化后,读取数据量降低了 99.9%,业务 Query 的 Latency 降低了 90.4%。除此之外,也支持了包括 Aggregate、Filter、Limit 等更多的算子下推。

8

跨 Union All 的常见算子下推:字节内部某个业务的数据是按照典型的分库分表存放的,在该场景下,用户如果需要查询全量数据,会对多张表进行 Union All 后再进行计算。目前,Flink Planner 缺乏对常用算子跨 Union All 下推的支持,导致用户查询会从 Source 读取大量的数据,并且处理这些数据也会占用大量的资源,最终导致资源消耗和 E2E Latency 都较高。因此支持了常用算子跨 Union All 下推的优化,包括 Aggregate,SortLimit 和 Limit 算子。

以 Aggregate 为例,从图中可以看出,在优化之前,Union All 节点的下游是一个 Local Aggregate 节点。由于当前 Flink Planner 不支持跨 Union All 的算子下推,导致这里的 Local Aggregate 节点无法下推到 Union All 的上游,也无法进一步下推到 Scan 节点,导致从存储读取了大量的数据。优化之后把 Local Aggregate 节点推到了 Union All 的上游,最终下推到了存储做计算。经过优化后,业务查询的 E2E Latency 降低 42%,Flink 集群的 CPU 消耗降低 30%。

9

在线上业务的查询中,带 Join 的查询是非常多的,其中大部分的查询是 Equal Join,并且带一个 Filter 条件。但是由于 Join 一侧的 Filter 没有传递到 Join 的另一侧,从而导致 Scan 的数据量较大,进而影响查询性能。

因此支持了 Join Filter 的传递。从上图中可以看出,t1 表的 Filter t1.id > 1,可以通过 Equal 的 Join 条件 t1.id=t2.id,推导出 t2.id>1。因此可以推到 t2 Scan 节点的上游,同时由于支持了 Filter 传递,最终 t2.id>1 会被下推到存储做计算,那么从 t2 的 Scan 节点读取的数据会大幅减少,从而提升查询性能。

11

在线上集群持续运行的过程中,我们发现了JM / TM 进程频繁创建 Classloader,导致 CPU 占用过高的问题。通过火焰图分析,JVM Dictionary::find 占据了 70% 以上的 CPU,进一步分析 JVM 源码发现,JVM 在加载了 class 之后,为了加速从 class name 到 Classloader 的查找,会维护一个名叫 SystemDictionary 的哈希表。在 Classloader 数量非常多的时候,哈希表中存在大量的冲突,导致查找过程非常缓慢,同时整个 JM 大部分的 CPU 都消耗在这个步骤。

通过定位发现,这些 Classloader 都是 UserCodeClassloader,用于动态加载用户的 Jar 包。从图中看出,新 Job 的 JobMaster 和 TM 上该 Job 的 Task 都会创建新的 UserCodeClassloader,导致 JM 和 TM 上的 Classloader 过多。除此之外,Classloader 过多还会导致 JVM Metaspace 空间不足,进而频繁地触发 Metaspace Full GC。

12

因此支持了 Classloader 复用的优化,分为两步:首先优化依赖 Jar 包的方式,由于 OLAP 场景下依赖的第三方 Jar 包是相对固定的,可以直接放在 JM 和 TM 启动的 Classpath 下,并不需要每个作业单独提交 Jar 包。其次,对于每个作业在 JobMaster 和 Task 初始化时,直接复用 System Classloader。经过优化后,JM 中 Dictionary::find 所占的 CPU 使用从 76% 下降到 1%,同时,Metaspace Full GC 的频率显著降低。

14

在 OLAP 场景下,Codegen 源代码编译的 TM CPU 占比较高,同时耗时较大。为了避免重复编译,当前的 Codegen 缓存机制会根据 Codegen 源代码的 Class Name 映射到 Task 所用的 Classloader,再映射到编译好的 Class 中,一定程度上缓解了该问题。但在当前缓存机制下,存在两个明显的问题:

  • 当前的机制只实现了同一个作业内部,同一个 Task 不同并发的复用,但是对于同一个 Query 的多次执行,依然存在重复编译;
  • 每次编译和加载 Class 都会创建一个新的 ByteArrayClassloader,频繁创建 Classloader 会导致 Metaspace 碎片严重,并引发 Metaspace Full GC,造成服务耗时的抖动。

15

为了避免跨作业代码的重复编译,实现跨作业的 Class 共享,需要优化缓存逻辑,实现相同源代码到编译 Class 的映射。存在以下两个难点:

如何保证相同逻辑的算子所生成的代码相同?

在 Codegen 代码生成的时候,把类名和变量名中的自增 ID,从全局粒度替换为 local context 粒度,使相同逻辑的算子能生成相同的代码。

如何设计 cache key 唯一识别相同的代码?

通过设计基于 Classloader 的 Hash 值 + Class Name + 代码的长度 + 代码的 MD5 值的四元组。并将其作为 cache key 来唯一识别相同的代码。

Codegen 缓存优化的效果非常明显,TM 侧代码编译的 CPU 使用率 46% -> 0.3%,Query 的 E2E Latency 降低了 29.2%,同时 Metaspace Full GC 的时间也降低了 71.5%。

16

在优化 Task 部署性能时,通过火焰图发现,TM Task 初始化阶段的 CPU 占用比较高,进一步分析发现在做 Task 部署信息的反序列化时,同一个 Task 的多个 Subtask 存在冗余的反序列化。Task 部署信息 TaskInfo 主要包含 Head Operator、Chained Operators 信息,在作业构建时会分别被序列化为 TaskInfo 中的 SerializedUDF 和 ChainedTaskConfig。为了减少冗余的反序列化,有以下两个可优化的方向:

其一是 Chained Operators 的嵌套序列化结构,主要是去掉对 Map 结构不必要的序列化和反序列化,使得同一 Task 的多个 Subtask 可以复用同一个反序列化后的 Map。

17

其二是 Codegen 算子的优化,在占比较大的 Codegen 算子在初始化时,也存在较高的反序列化开销。经过分析,该类算子部署信息主要包含 Codegen 源代码,但是一个 TM 上的多个 Subtask 都需要反序列化一遍同样的源代码,存在大量的冗余,因此把 Codegen 源代码拆分出来,单独反序列化一遍后,给所有 Subtask 共享。

以上反序列化优化的效果非常明显,在同一个 Task 的 Subtask 个数等于 3 的时候,TaskInfo 整体的序列化和反序列化 QPS 分别提升了 102% 和 163%。

18

  • Join Probe 提前输出:Probe / Full Outer Hash Join 支持在 Probe 阶段,基于 Build 端的 Bloom Filter 提前输出结果,减少 Probe 端数据的落盘,从而提升性能。

  • 内存池化:在算子启动的时候,从 Managed Memory 申请内存,并初始化内存分片。在 OLAP 场景下,这部分的时间和资源消耗占比较大,因此支持了 Cached Memory Pool,即在 TM 维度内共享内存池,而不需要在算子启动的时候初始化内存。

  • 内存使用优化:在并行执行包含大量 Aggregate / Join 算子的 Query 时,发现即使数据量非常小,TM 的Managed Memory 使用也很高。经过排查,对于需要使用 Managed Memory 的算子,每次申请内存的步长是 16 MB,因此这些算子的每个并发都至少需要申请 16 MB 内存,导致内存的实际利用率很低,因此支持了可配置步长,并设置较小的默认值以节省大量内存。

三、集群运维和稳定性建设

19

构建运维发版流程:在进行完善的测试后,使用自动化流水线,对上下游依赖的所有组件统一发版,最后对线上集群进行平滑的升级。

完善测试方式:支持 CI、准确性测试、性能测试、长稳和故障测试。CI 可以及时发现 UT 失败的问题;准确性测试选择 Query 丰富的 TPC-DS 测试集;性能测试主要包括 TPC-H 性能测试和调度 QPS 性能测试;此外,由于在线服务对稳定性要求比较高,因此支持了长稳和故障测试,在服务长时间运行,并注入各种故障场景的情况下,判断集群的状态、测试 Query 的执行结果等是否符合预期。其中故障测试包含了丰富的故障场景,包括异常 SQL,JM / TM 退出和网络故障等,帮助发现内存泄露等问题,提高了服务的稳定性。

平滑升级线上集群:支持 SQL Gateway 滚动升级。具体的实现过程是通过先启动一个新版本的 Flink 集群,再把线上的多个 Gateway 实例逐个滚动地切流到新的集群,实现无感升级,使得服务中断时间从之前的 5 min 降低到接近为 0。同时在滚动切流时,会进行小流量验证,在发现问题后能够快速回滚,降低上线风险。

20

监控体系的完善过程中,除了流批的集群监控,比如对 CPU 等资源使用的监控、GC 时间等进程状态的监控外,还增加了细粒度的 CPU 监控,用于明确在短 Query 的情况下,集群是否存在 CPU 瓶颈。与此同时,通过增加查询负载监控,判断业务负载和 Flink 集群的负载是否正常。

在集群监控之外,又增加了 OLAP 下所特有的作业监控,完善了全链路的 Latency,方便快速定位慢查询出现耗时问题的阶段,比如 Parse、Optimize、Job 执行阶段等。此外,还增加了更多的慢查询和失败查询的监控,以及对依赖的外部 IO 的监控等。

21

Flink OLAP 作为在线服务对稳定性要求很高,但是在落地初期,由于服务缺乏容灾、JM / TM FGC 频繁等问题,线上稳定性较差。我们分别从 HA、限流、GC 优化和 JM 稳定性提升四个方面进行治理。

  • HA:支持双机房热备,提高在线服务的可用性。支持双机房容灾后,可以通过切流快速恢复。其次,通过支持 JM HA,解决 JM 单点的问题,提升线上服务的可用性。
  • 限流和熔断:虽然在流式和批式下,没有作业的限流需求,但在 OLAP 场景下,用户会持续提交 Query。为了避免查询高峰集群被打挂,支持了 SQL Gateway 的 QPS 限流。为了避免多作业同时运行导致的 JM 和 TM 的负载过高、查询过慢的问题,我们限制了 Flink 集群最大运行的作业数。除了限流之外,还支持了在 OLAP 下,使用 Failfast 的 Failover 策略,避免失败作业堆积,造成集群雪崩。
  • GC 优化:OLAP 场景下,业务对 Latency 非常敏感,Full GC 会导致耗时抖动。因此优化了 JM 和 TM 的 Full GC。首先移除 Task / Operator 级别的 Metric,使 JM 的 Full GC 频率降低 88%。其次,支持 Codegen 缓存优化,使 TM 的 Metaspace Full GC 次数降低为接近 0。
  • JM 稳定性提升:在 OLAP 场景下,支持 JobMaster 去除 ZK 依赖,因为在高 QPS 下,ZK 依赖会导致作业的 Latency 抖动。同时限制 Flink UI 展示的作业数,因为在 OLAP 场景下持续提交大量的作业,会使整个 JM 的内存过大,影响 JM 的稳定性。与此同时,关闭 Flink UI 的自动刷新,避免自动刷新导致 JM 负载上升引起页面的卡顿。

四、收益

22

Benchmark 收益:通过上述对 Query Optimizer 和 Query Executor 的查询优化,在 TPCH 100G 的Benchmark 中,Query Latency 降低了50.1%。其次对三类不同复杂程度的小数据量查询(点查类 Source-Sink、较复杂的 WordCount 和更加复杂的三表 Join),进行了 E2E Benchmark,优化效果非常明显,E2E QPS 平均提升 25 倍,E2E Latency 平均降低 92%,降低了超过 10倍。

23

业务收益:性能和稳定性都有明显的提高。性能方面,Job 平均 Latency 降低了 48.3%,TM 平均 CPU 降低了27.3%;稳定性方面,JM Full GC 频率降低了 88%,TM Full GC 时间降低了 71.5%。

五、未来规划

  • 产品化完善:包括 History Server 的支持和慢查询的智能分析。
  • 向量化引擎:充分利用 CPU 的并行化能力,提升计算性能。
  • 物化视图:对于大数据量的计算,现查现算的耗时和资源开销都非常大,所以未来考虑引入物化视图加速用户的查询,节省资源使用。
  • Optimizer 演进:持续跟进业界和学术界的最新进展,比如基于 Learning-based 实现 SQL Optimization 的 AI4DB 等。

点击查看原文视频 & 演讲PPT


更多内容

img


活动推荐

阿里云基于 Apache Flink 构建的企业级产品-实时计算 Flink 版现开启活动:
0 元试用 实时计算 Flink 版(5000CU*小时,3 个月内)
了解活动详情:https://free.aliyun.com/?pipCode=sc

image.png

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
8月前
|
存储 消息中间件 OLAP
基于 Flink+Paimon+Hologres 搭建淘天集团湖仓一体数据链路
本文整理自淘天集团高级数据开发工程师朱奥在Flink Forward Asia 2024的分享,围绕实时数仓优化展开。内容涵盖项目背景、核心策略、解决方案、项目价值及未来计划五部分。通过引入Paimon和Hologres技术,解决当前流批存储不统一、实时数据可见性差等痛点,实现流批一体存储与高效近实时数据加工。项目显著提升了数据时效性和开发运维效率,降低了使用门槛与成本,并规划未来在集团内推广湖仓一体架构,探索更多技术创新场景。
1610 3
基于 Flink+Paimon+Hologres 搭建淘天集团湖仓一体数据链路
|
6月前
|
SQL 分布式计算 DataWorks
破界·融合·进化:解码DataWorks与Hologres的湖仓一体实践
基于阿里云DataWorks与实时数仓Hologres,提供统一的大数据开发治理平台与全链路实时分析能力。DataWorks支持多行业数据集成与管理,Hologres实现海量数据的实时写入与高性能查询分析,二者深度融合,助力企业构建高效、实时的数据驱动决策体系,加速数字化升级。
|
9月前
|
存储 监控 数据挖掘
京东物流基于Flink & StarRocks的湖仓建设实践
本文整理自京东物流高级数据开发工程师梁宝彬在Flink Forward Asia 2024的分享,聚焦实时湖仓的探索与建设、应用实践、问题思考及未来展望。内容涵盖京东物流通过Flink和Paimon等技术构建实时湖仓体系的过程,解决复杂业务场景下的数据分析挑战,如多维OLAP分析、大屏监控等。同时,文章详细介绍了基于StarRocks的湖仓一体方案,优化存储成本并提升查询效率,以及存算分离的应用实践。最后,对未来数据服务的发展方向进行了展望,计划推广长周期数据存储服务和原生数据湖建设,进一步提升数据分析能力。
909 1
京东物流基于Flink & StarRocks的湖仓建设实践
|
4月前
|
存储 JSON 数据处理
Flink基于Paimon的实时湖仓解决方案的演进
本文源自Apache CommunityOverCode Asia 2025,阿里云专家苏轩楠分享Flink与Paimon构建实时湖仓的演进实践。深度解析Variant数据类型、Lookup Join优化等关键技术,提升半结构化数据处理效率与系统可扩展性,推动实时湖仓在生产环境的高效落地。
564 1
Flink基于Paimon的实时湖仓解决方案的演进
|
9月前
|
存储 SQL 运维
中国联通网络资源湖仓一体应用实践
本文分享了中国联通技术专家李晓昱在Flink Forward Asia 2024上的演讲,介绍如何借助Flink+Paimon湖仓一体架构解决传统数仓处理百亿级数据的瓶颈。内容涵盖网络资源中心概况、现有挑战、新架构设计及实施效果。新方案实现了数据一致性100%,同步延迟从3小时降至3分钟,存储成本降低50%,为通信行业提供了高效的数据管理范例。未来将深化流式数仓与智能运维融合,推动数字化升级。
465 0
中国联通网络资源湖仓一体应用实践
|
5月前
|
SQL 存储 运维
Apache Doris 在菜鸟的大规模湖仓业务场景落地实践
本文介绍了 Apache Doris 在菜鸟的大规模落地的实践经验,菜鸟为什么选择 Doris,以及 Doris 如何在菜鸟从 0 开始,一步步的验证、落地,到如今上万核的规模,服务于各个业务线,Doris 已然成为菜鸟 OLAP 数据分析的最优选型。
393 2
Apache Doris 在菜鸟的大规模湖仓业务场景落地实践
|
4月前
|
存储 人工智能 监控
淘宝闪购基于Flink&Paimon的Lakehouse生产实践:从实时数仓到湖仓一体化的演进之路
本文整理自淘宝闪购(饿了么)大数据架构师王沛斌在 Flink Forward Asia 2025 上海站的分享,深度解析其基于 Apache Flink 与 Paimon 的 Lakehouse 架构演进与落地实践,涵盖实时数仓发展、技术选型、平台建设及未来展望。
1032 0
淘宝闪购基于Flink&Paimon的Lakehouse生产实践:从实时数仓到湖仓一体化的演进之路
|
7月前
|
资源调度 Kubernetes 流计算
Flink在B站的大规模云原生实践
本文基于哔哩哔哩资深开发工程师丁国涛在Flink Forward Asia 2024云原生专场的分享,围绕Flink On K8S的实践展开。内容涵盖五个部分:背景介绍、功能及稳定性优化、性能优化、运维优化和未来展望。文章详细分析了从YARN迁移到K8S的优势与挑战,包括资源池统一、环境一致性改进及隔离性提升,并针对镜像优化、Pod异常处理、启动速度优化等问题提出解决方案。此外,还探讨了多机房容灾、负载均衡及潮汐混部等未来发展方向,为Flink云原生化提供了全面的技术参考。
443 9
Flink在B站的大规模云原生实践
|
8月前
|
SQL 存储 NoSQL
Flink x Paimon 在抖音集团生活服务的落地实践
本文整理自抖音集团数据工程师陆魏与流式计算工程冯向宇在Flink Forward Asia 2024的分享,聚焦抖音生活服务业务中的实时数仓技术演变及Paimon湖仓实践。文章分为三部分:背景及现状、Paimon湖仓实践与技术优化。通过引入Paimon,解决了传统实时数仓开发效率低、资源浪费、稳定性差等问题,显著提升了开发运维效率、节省资源并增强了任务稳定性。同时,文中详细探讨了Paimon在维表实践、宽表建设、标签变更检测等场景的应用,并介绍了其核心技术优化与未来规划。
848 10
Flink x Paimon 在抖音集团生活服务的落地实践

相关产品

  • 实时计算 Flink版