Flink SQL 在字节跳动的优化与实践

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: Flink 在字节的应用实战

整理 | Aven (Flink 社区志愿者)

摘要:本文由 Apache Flink Committer,字节跳动架构研发工程师李本超分享,以四个章节来介绍 Flink 在字节的应用实战。 内容如下:

  • 整体介绍
  • 实践优化
  • 流批一体
  • 未来规划

一、整体介绍

2018 年 12 月 Blink 宣布开源,经历了约一年的时间 Flink 1.9 于 2019 年 8 月 22 发布。在 Flink 1.9 发布之前字节跳动内部基于 master 分支进行内部的 SQL 平台构建。经历了 2~3 个月的时间字节内部在 19 年 10 月份发布了基于 Flink 1.9 的 Blink planner 构建的 Streaming SQL 平台,并进行内部推广。在这个过程中发现了一些比较有意思的需求场景,以及一些较为奇怪的 BUG。

基于 1.9 的 Flink SQL 扩展

虽然最新的 Flink 版本已经支持 SQL 的 DDL,但 Flink 1.9 并不支持。字节内部基于 Flink 1.9 进行了 DDL 的扩展支持以下语法:

  • create table
  • create view
  • create function
  • add resource

同时 Flink 1.9 版本不支持的 watermark 定义在 DDL 扩展后也支持了。

我们在推荐大家尽量的去用 SQL 表达作业时收到很多“SQL 无法表达复杂的业务逻辑”的反馈。时间久了发现其实很多用户所谓的复杂业务逻辑有的是做一些外部的 RPC 调用,字节内部针对这个场景做了一个 RPC 的维表和 sink,让用户可以去读写 RPC 服务,极大的扩展了 SQL 的使用场景,包括 FaaS 其实跟 RPC 也是类似的。在字节内部添加了 Redis/Abase/Bytable/ByteSQL/RPC/FaaS 等维表的支持。

同时还实现了多个内部使用的 connectors:

  1. source: RocketMQ
  2. sink:
    RocketMQ/ClickHouse/Doris/LogHouse/Redis/Abase/Bytable/ByteSQL/RPC/Print/Metrics

并且为 connector 开发了配套的 format:PB/Binlog/Bytes。

在线的界面化 SQL 平台

2.png

除了对 Flink 本身功能的扩展,字节内部也上线了一个 SQL 平台,支持以下功能:

  • SQL 编辑
  • SQL 解析
  • SQL 调试
  • 自定义 UDF 和 Connector
  • 版本控制
  • 任务管理

二、实践优化

除了对功能的扩展,针对 Flink 1.9 SQL 的不足之处也做了一些优化。

Window 性能优化

1、支持了 window Mini-Batch

Mini-Batch 是 Blink planner 的一个比较有特色的功能,其主要思想是积攒一批数据,再进行一次状态访问,达到减少访问状态的次数降低序列化反序列化的开销。这个优化主要是在 RocksDB 的场景。如果是 Heap 状态 Mini-Batch 并没什么优化。在一些典型的业务场景中,得到的反馈是能减少 20~30% 左右的 CPU 开销。

2、扩展 window 类型

目前 SQL 中的三种内置 window,滚动窗口、滑动窗口、session 窗口,这三种语意的窗口无法满足一些用户场景的需求。比如在直播的场景,分析师想统计一个主播在开播之后,每一个小时的 UV(Unique Visitor)、GMV(Gross Merchandise Volume) 等指标。自然的滚动窗口的划分方式并不能够满足用户的需求,字节内部就做了一些定制的窗口来满足用户的一些共性需求。

-- my_window 为自定义的窗口,满足特定的划分方式
SELECT
room_id,
COUNT(DISTINCT user_id)
FROM MySource
GROUP BY
room_id,
my_window(ts, INTERVAL '1' HOURS)

3、window offset

这是一个较为通用的功能,在 Datastream API 层是支持的,但 SQL 中并没有。这里有个比较有意思的场景,用户想要开一周的窗口,一周的窗口变成了从周四开始的非自然周。因为谁也不会想到 1970 年 1 月 1 号那天居然是周四。在加入了 offset 的支持后就可以支持正确的自然周窗口。

SELECT
room_id,
COUNT(DISTINCT user_id)
FROM MySource
GROUP BY
room_id,
TUMBLE(ts, INTERVAL '7' DAY, INTERVAL '3', DAY)

维表优化

1、延迟 Join

维表 Join 的场景下因为维表经常发生变化尤其是新增维度,而 Join 操作发生在维度新增之前,经常导致关联不上。

所以用户希望如果 Join 不到,则暂时将数据缓存起来之后再进行尝试,并且可以控制尝试次数,能够自定义延迟 Join 的规则。这个需求场景不单单在字节内部,社区的很多同学也有类似的需求。

基于上面的场景实现了延迟 Join 功能,添加了一个可以支持延迟 Join 维表的算子。当 Join 没有命中,local cache 不会缓存空的结果,同时将数据暂时保存在一个状态中,之后根据设置定时器以及它的重试次数进行重试。

3.png

2、维表 Keyby 功能

4.png

通过拓扑我们发现 Cacl 算子和 lookUpJoin 算子是 chain 在一起的。因为它没有一个 key 的语义。

当作业并行度比较大,每一个维表 Join 的 subtask,访问的是所有的缓存空间,这样对缓存来说有很大的压力。

但观察 Join 的 SQL,等值 Join 是天然具有 Hash 属性的。直接开放了配置,运行用户直接把维表 Join 的 key 作为 Hash 的条件,将数据进行分区。这样就能保证下游每一个算子的 subtask 之间的访问空间是独立的,这样可以大大的提升开始的缓存命中率。

除了以上的优化,还有两点目前正在开发的维表优化。

1、广播维表:有些场景下维表比较小,而且更新不频繁,但作业的 QPS 特别高。如果依然访问外部系统进行 Join,那么压力会非常大。并且当作业 Failover 的时候 local cache 会全部失效,进而又对外部系统造成很大访问压力。那么改进的方案是定期全量 scan 维表,通过Join key hash 的方式发送到下游,更新每个维表 subtask 的缓存。
2、Mini-Batch:主要针对一些 I/O 请求比较高,系统又支持 batch 请求的能力,比如说 RPC、HBase、Redis 等。以往的方式都是逐条的请求,且 Async I/O 只能解决 I/O 延迟的问题,并不能解决访问量的问题。通过实现 Mini-Batch 版本的维表算子,大量降低维表关联访问外部存储次数。

Join 优化

目前 Flink 支持的三种 Join 方式;分别是 Interval Join、Regular Join、Temporal Table Function。

前两种语义是一样的流和流 Join。而 Temporal Table 是流和表的的 Join,右边的流会以主键的形式形成一张表,左边的流去 Join 这张表,这样一次 Join 只能有一条数据参与并且只返回一个结果。而不是有多少条都能 Join 到。

它们之间的区别列了几点:

5.png

可以看到三种 Join 方式都有它本身的一些缺陷。

  1. Interval Join 目前使用上的缺陷是它会产生一个 out join 数据和 watermark 乱序的情况。
  2. Regular Join 的话,它最大的缺陷是 retract 放大(之后会详细说明这个问题)。
  3. Temporal table function 的问题较其它多一些,有三个问题。
  • 不支持 DDl
  • 不支持 out join 的语义 (FLINK-7865 的限制)
  • 右侧数据断流导致 watermark 不更新,下游无法正确计算 (FLINK-18934)

对于以上的不足之处字节内部都做了对应的修改。

增强 Checkpoint 恢复能力

对于 SQL 作业来说一旦发生条件变化都很难从 checkpoint 中恢复。

SQL 作业确实从 checkpoint 恢复的能力比较弱,因为有时候做一些看起来不太影响 checkpoint 的修改,它仍然无法恢复。无法恢复主要有两点;

  • 第一点:operate ID 是自动生成的,然后因为某些原因导致它生成的 ID 改变了。
  • 第二点:算子的计算的逻辑发生了改变,即算子内部的状态的定义发生了变化。

例子1:并行度发生修改导致无法恢复。

6.png

source 是一个最常见的有状态的算子,source 如果和之后的算子的 operator chain 逻辑发生了改变,是完全无法恢复的。

下图左上是正常的社区版的作业会产生的一个逻辑, source 和后面的并行度一样的算子会被 chain 在一起,用户是无法去改变的。但算子并行度是常会会发生修改,比如说 source 由原来的 100 修改为 50,cacl 的并发是 100。此时 chain 的逻辑就会发生变化。

7.png

针对这种情况,字节内部做了修改,允许用户去配置,即使 source 的并行度跟后面整体的作业的并行度是一样的,也让其不与之后的算子 chain 在一起。

例子2:DAG 改变导致无法恢复。

8.png

这是一种比较特殊的情况,有一条 SQL (上图),可以看到 source 没有发生变化,之后的三个聚合互相之间没有关系,状态竟然也是无法恢复。

作业之所以无法恢复,是因为 operator ID 生成规则导致的。目前 SQL 中 operator ID 的生成的规则与上游、本身配置以及下游可以 chain 在一起的算子的数量都有关系。 因为新增指标,会导致新增一个 Calc 的下游节点,进而导致 operator ID 发生变化。

为了处理这种情况,支持了一种特殊的配置模式,允许用户配置生成 operator ID 的时候可以忽略下游 chain 在一起算子数量的条件。

例子3:新增聚合指标导致无法恢复

这块是用户诉求最大的,也是最复杂的部分。用户期望新增一些聚合指标后,原来的指标要能从 checkpoint 中恢复。

9.png

可以看到图中左部分是 SQL 生成的算子逻辑。count,sum,sum,count,distinct 会以一个 BaseRow 的结构存储在 ValueState 中。distinct 比较特殊一些,还会单独存储在一个 MapState 中。

这导致了如新增或者减少指标,都会使原先的状态没办法从 ValueState 中正常恢复,因为 VauleState 中存储的状态 “schema” 和新的(修改指标后)的 “schema”不匹配,无法正常反序列化。

10.png

11.png

在讨论解决方案之前,我们先回顾一下正常的恢复流。先从 checkpoint 中恢复出状态的 serializer,再通过 serializer 把状态恢复。接下来 operator 去注册新的状态定义,新的状态定义会和原先的状态定义进行一个兼容性对比,如果是兼容则状态恢复成功,如果不兼容则抛出异常任务失败。

不兼容的另一种处理情况是允许返回一个 migration(实现两个不匹配类型的状态恢复)那么也可以恢复成功。

针对上面的流程做出对应的修改:

  1. 第一步使新旧 serializer 互相知道对方的信息,添加一个接口,且修改了 statebackend resolve compatibility 的过程,把旧的信息传递给新的,并使其获取整个 migrate 过程。
  2. 第二步判断新老之间是否兼容,如果不兼容是否需要做一次 migration。然后让旧的 serializer 去恢复一遍状态,并使用新的 serializer 写入新的状态。
  3. 对 aggregation 的代码生成进行处理,当发现 aggregation 拿到的是指标是 null,那么将做一些初始化的工作。

通过以上的修改基本就可以做到正常的,新增的聚合指标从拆开的方案恢复。

三、流批一体探索

业务现状

字节跳动内部对流批一体和业务推广之前,技术团队提前做了大量技术方面的探索。整体判断是 SQL 这一层是可以做到流批一体的语义,但实践中却又发现不少不同。

比如说流计算的 session window,或是基于处理时间的 window,在批计算中无法做到。同时 SQL 在批计算中一些复杂的 over window,在流计算中也没有对应的实现。

但这些特别的场景可能只占 10% 甚至更少,所以用 SQL 去落实流批一体是可行的。

12.png

流批一体

这张图是比较常见的和大多数公司里的架构都类似。这种架构有什么缺陷呢?

  1. 数据不同源:批任务一般会有一次前置处理任务,不管是离线的也好实时的也好,预先进过一层加工后写入 Hive。而实时任务是从 kafka 读取原始的数据,可能是 json 格式,也可能是 avro 等等。直接导致批任务中可执行的 SQL 在流任务中没有结果生成或者执行结果不对。
  2. 计算不同源:批任务一般是 Hive + Spark 的架构,而流任务基本都是基于 Flink。不同的执行引擎在实现上都会有一些差异,导致结果不一致。不同的执行引擎有不同的 API 定义 UDF,它们之间也是无法被公用的。大部分情况下都是维护两套基于不同 API 实现的相同功能的 UDF。

鉴于上面的问题,提出了基于 Flink 的流批一体架构来解决。

  1. 数据不同源:流式处理先通过 Flink 处理之后写入 MQ 供下游流式 Flink job 去消费,对于批式处理由 Flink 处理后流式写入到 Hive,再由批式的 Flink job 去处理。
  2. 引擎不同源:既然都是基于 Flink 开发的流式,批式 job,自然没有计算不同源问题,同时也避免了维护多套相同功能的 UDF。

基于 Flink 实现的流批一体架构:

13.png

业务收益

  1. 统一的 SQL:通过一套 SQL 来表达流和批计算两种场景,减少开发维护工作。
  2. 复用 UDF:流式和批式计算可以共用一套 UDF。这对业务来说是有积极意义的。
  3. 引擎统一:对于业务的学习成本和架构的维护成本都会降低很多。
  4. 优化统一:大部分的优化都是可以同时作用在流式和批式计算上,比如对 planner、operator 的优化流和批可以共享。

四、未来工作和规划

优化 retract 放大问题

14.png

什么是 retract 放大?

上图有 4 张表,第一张表进行去重操作 (Dedup),之后分别和另外三张表做 Join。逻辑比较简单,表 A 输入(A1),最后产出 (A1,B1,C1,D1) 的结果。

当表 A 输入一个 A2,因为 Dedup 算子,导致数据需要去重,则向下游发送一个撤回 A1 的操作 -(A1) 和一个新增 A2 的操作 +(A2)。第一个 Join 算子收到 -(A1) 后会将 -(A1) 变成 -(A1,B1) 和 +(null,B1)(为了保持它认为的正确语义) 发送到下游。之后又收到了 +(A2) ,则又向下游发送 -(null,B1) 和 +(A2,B1) 这样操作就放大了两倍。再经由下游的算子操作会一直被放大,到最终的 sink 输出可能会被放大 1000 倍之多。

15.png

如何解决?

将原先 retract 的两条数据变成一条 changelog 的格式数据,在算子之间传递。算子接收到 changelog 后处理变更,然后仅仅向下游发送一个变更 changelog 即可。

未来规划

image.png

1.功能优化
  • 支持所有类型聚合指标变更的 checkpoint 恢复能力
  • window local-global
  • 事件时间的 Fast Emit
  • 广播维表
  • 更多算子的 Mini-Batch 支持:维表,TopN,Join 等
  • 全面兼容 Hive SQL 语法
2.业务扩展
  • 进一步推动流式 SQL 达到 80%
  • 探索落地流批一体产品形态
  • 推动实时数仓标准化

社区二维码.png

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
6月前
|
存储 监控 数据挖掘
京东物流基于Flink & StarRocks的湖仓建设实践
本文整理自京东物流高级数据开发工程师梁宝彬在Flink Forward Asia 2024的分享,聚焦实时湖仓的探索与建设、应用实践、问题思考及未来展望。内容涵盖京东物流通过Flink和Paimon等技术构建实时湖仓体系的过程,解决复杂业务场景下的数据分析挑战,如多维OLAP分析、大屏监控等。同时,文章详细介绍了基于StarRocks的湖仓一体方案,优化存储成本并提升查询效率,以及存算分离的应用实践。最后,对未来数据服务的发展方向进行了展望,计划推广长周期数据存储服务和原生数据湖建设,进一步提升数据分析能力。
580 1
京东物流基于Flink & StarRocks的湖仓建设实践
|
4月前
|
资源调度 Kubernetes 流计算
Flink在B站的大规模云原生实践
本文基于哔哩哔哩资深开发工程师丁国涛在Flink Forward Asia 2024云原生专场的分享,围绕Flink On K8S的实践展开。内容涵盖五个部分:背景介绍、功能及稳定性优化、性能优化、运维优化和未来展望。文章详细分析了从YARN迁移到K8S的优势与挑战,包括资源池统一、环境一致性改进及隔离性提升,并针对镜像优化、Pod异常处理、启动速度优化等问题提出解决方案。此外,还探讨了多机房容灾、负载均衡及潮汐混部等未来发展方向,为Flink云原生化提供了全面的技术参考。
245 9
Flink在B站的大规模云原生实践
|
5月前
|
SQL 存储 NoSQL
Flink x Paimon 在抖音集团生活服务的落地实践
本文整理自抖音集团数据工程师陆魏与流式计算工程冯向宇在Flink Forward Asia 2024的分享,聚焦抖音生活服务业务中的实时数仓技术演变及Paimon湖仓实践。文章分为三部分:背景及现状、Paimon湖仓实践与技术优化。通过引入Paimon,解决了传统实时数仓开发效率低、资源浪费、稳定性差等问题,显著提升了开发运维效率、节省资源并增强了任务稳定性。同时,文中详细探讨了Paimon在维表实践、宽表建设、标签变更检测等场景的应用,并介绍了其核心技术优化与未来规划。
504 10
Flink x Paimon 在抖音集团生活服务的落地实践
|
5月前
|
资源调度 Kubernetes 调度
网易游戏 Flink 云原生实践
本文分享了网易游戏在Flink实时计算领域的资源管理与架构演进经验,从Yarn到K8s云原生,再到混合云的实践历程。文章详细解析了各阶段的技术挑战与解决方案,包括资源隔离、弹性伸缩、自动扩缩容及服务混部等关键能力的实现。通过混合云架构,网易游戏显著提升了资源利用率,降低了30%机器成本,小作业计算成本下降40%,并为未来性能优化、流批一体及智能运维奠定了基础。
285 9
网易游戏 Flink 云原生实践
|
7月前
|
存储 运维 BI
万字长文带你深入广告场景Paimon+Flink全链路探索与实践
本文将结合实时、离线数据研发痛点和当下Paimon的特性,以实例呈现低门槛、低成本、分钟级延迟的流批一体化方案,点击文章阅读详细内容~
|
SQL Kubernetes Cloud Native
开发者社区精选直播合集(三十六)| Flink实践合集
Flink 作为业界公认为最好的流计算引擎,不仅仅局限于做流处理,而是一套兼具流、批、机器学习等多种计算功能的大数据引擎,以其高吞吐低延时的优异实时计算能力、支持海量数据的亚秒级快速响应帮助企业和开发者实现数据算力升级,并成为阿里、腾讯、滴滴、美团、字节跳动、Netflix、Lyft 等国内外知名公司建设实时计算平台的首选。
开发者社区精选直播合集(三十六)|  Flink实践合集
|
2月前
|
存储 分布式计算 数据处理
「48小时极速反馈」阿里云实时计算Flink广招天下英雄
阿里云实时计算Flink团队,全球领先的流计算引擎缔造者,支撑双11万亿级数据处理,推动Apache Flink技术发展。现招募Flink执行引擎、存储引擎、数据通道、平台管控及产品经理人才,地点覆盖北京、杭州、上海。技术深度参与开源核心,打造企业级实时计算解决方案,助力全球企业实现毫秒洞察。
391 0
「48小时极速反馈」阿里云实时计算Flink广招天下英雄
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
11月前
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
3323 73
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎

相关产品

  • 实时计算 Flink版
  • 下一篇
    oss教程