Flink SQL 性能优化:multiple input 详解

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 在 Flink 1.12 中,针对目前 operator chaining 无法覆盖的场景,推出了 multiple input operator 与 source chaining 优化。该优化将消除 Flink 作业中大多数冗余 shuffle,进一步提高作业的执行效率。本文将以一个 SQL 作业为例介绍上述优化,并展示 Flink 1.12 在 TPC-DS 测试集上取得的成果。

作者|贺小令、翁才智

执行效率的优化一直是 Flink 追寻的目标。在大多数作业,特别是批作业中,数据通过网络在 task 之间传递(称为数据 shuffle)的代价较大。正常情况下一条数据经过网络需要经过序列化、磁盘读写、socket 读写与反序列化等艰难险阻,才能从上游 task 传输到下游;而相同数据在内存中的传输,仅需要耗费几个 CPU 周期传输一个八字节指针即可。

Flink 在早期版本中已经通过 operator chaining 机制,将并发相同的相邻单输入算子整合进同一个 task 中,消除了单输入算子之间不必要的网络传输。然而,join 等多输入算子之间同样存在额外的数据 shuffle 问题,shuffle 数据量最大的 source 节点与多输入算子之间的数据传输也无法利用 operator chaining 机制进行优化。

在 Flink 1.12 中,我们针对目前 operator chaining 无法覆盖的场景,推出了 multiple input operator 与 source chaining 优化。该优化将消除 Flink 作业中大多数冗余 shuffle,进一步提高作业的执行效率。本文将以一个 SQL 作业为例介绍上述优化,并展示 Flink 1.12 在 TPC-DS 测试集上取得的成果。

优化案例解析:订单量统计

我们将以 TPC-DS q96 为例子详细介绍如何消除冗余 shuffle,该 SQL 意在通过多路 join 筛选并统计符合特定条件的订单量。

select count(*) 
from store_sales
    ,household_demographics 
    ,time_dim, store
where ss_sold_time_sk = time_dim.t_time_sk   
    and ss_hdemo_sk = household_demographics.hd_demo_sk 
    and ss_store_sk = s_store_sk
    and time_dim.t_hour = 8
    and time_dim.t_minute >= 30
    and household_demographics.hd_dep_count = 5
    and store.s_store_name = 'ese'

image.png

图 1 - 初始执行计划

冗余 Shuffle 是如何产生的?

由于部分算子对输入数据的分布有要求(如 hash join 算子要求同一并发内数据 join key 的 hash 值相同),数据在算子之间传递时可能需要经过重新排布与整理。与 map-reduce 的 shuffle 过程类似,Flink shuffle 将上游 task 产生的中间结果进行整理,并按需发送给需要这些中间结果的下游 task。但在一部分情况下,上游产出的数据已经满足了数据分布要求(如连续多个 join key 相同的 hash join 算子),此时对数据的整理便不再必要,由此产生的 shuffle 也就成为了冗余 shuffle,在执行计划中以 forward shuffle 表示。

图 1 中的 hash join 算子是一种称为 broadcast hash join 的特殊算子。以 store_sales join time_dim 为例,由于 time_dim 表数据量很小,此时通过 broadcast shuffle 将该表的全量数据发送给 hash join 的每个并发,就能让任何并发接受 store_sales 表的任意数据而不影响 join 结果的正确性,同时提高 hash join 的执行效率。此时 store_sales 表向 join 算子的网络传输也成为了冗余 shuffle。同理几个 join 之间的 shuffle 也是不必要的。

image.png

图 2 - 冗余的shuffle(红框标记)

除 hash join 与 broadcast hash join 外,产生冗余 shuffle 的场景还有很多,例如 group key 与 join key 相同的 hash aggregate + hash join、group key 具有包含关系的多个 hash aggregate 等等,这里不再展开描述。

Operator Chaining 能解决吗?

对 Flink 优化过程有一定了解的读者可能会知道,为了消除不必要的 forward shuffle,Flink 在早期就已经引入了 operator chaining 机制。该机制将并发相同的相邻单输入算子整合进同一个 task 中,并在同一个线程中一起运算。Operator chaining 机制在图 1 中其实已经在发挥作用,如果没有它,做 broadcast shuffle 的三个 Source 节点名称中被“->”分隔的算子将会被拆分至多个不同的 task,产生冗余的数据 shuffle。图 3 为 Operator chaining 关闭是的执行计划。

image.png

图 3 - Operator chaining关闭后的执行计划

减少数据在 TM 之间通过网络和文件传输并将算子链接合并入 task 是非常有效的优化:它能减少线程之间的切换,减少消息的序列化与反序列化,减少数据在缓冲区的交换,并减少延迟的同时提高整体吞吐量。然而,operator chaining 对算子的整合有非常严格的限制,其中一条就是“下游算子的入度为 1”,也就是说下游算子只能有一路输入。这就将多路输入的算子(如 join)排除在外。

多输入算子的解决方案:Multiple Input Operator

如果我们能仿照 operator chaining 的优化思路,引入新的优化机制并满足以下条件:

  1. 该机制可以组合多输入的算子;
  2. 该机制支持多路输入(为被组合的算子提供输入)

我们就可以将用 forward shuffle 连接的的多输入算子放到一个 task 里执行,从而消除不必要的 shuffle。Flink 社区很早就关注到了 operator chaining 的不足,在 Flink 1.11 中引入了 streaming api 层的 MultipleInputTransformation 以及对应的 MultipleInputStreamTask。这些 api 满足了上述条件 2,而 Flink 1.12 在此基础上在 SQL 层中实现了满足条件 1 的新算子——multiple input operator,可以参考 FLIP 文档[1]。

Multiple input operator 是 table 层一个可插拔的优化。它位于 table 层优化的最后一步,遍历生成的执行计划并将不被 exchange 阻隔的相邻算子整合进一个 multiple input operator 中。图 4 展示了该优化对原本 SQL 优化步骤的修改。

image.png

图 4 - 加入 multiple input operator 后的优化步骤

读者可能会有疑问:为什么不在现有的 operator chaining 上进行修改,而要另起炉灶呢?实际上,multiple input operator 除了要完成 operator chaining 的工作之外,还需要对各个输入的优先级进行排序。这是因为一部分多输入算子(如 hash join 与 nested loop join)对输入有严格的顺序限制,若输入优先级排序不当很可能造成死锁。由于算子输入优先级的信息仅在 table 层的算子中有描述,更加自然的方式是在 table 层引入该优化机制。

值得注意的是,multiple input operator 不同于管理多个 operator 的 operator chaining,其本身就是一整个大 operator,而其内部运算在外界看来就是一个黑盒。Multiple input operator 的内部结构在 operator name 中完全体现,读者在运行包含该 operator 的作业时,可以从 operator name 看到哪些算子以怎样的拓扑结构被组合进了 multiple input operator 中。

图 5 展示了经过 multiple input 优化后的算子的拓扑图以及 multiple input operator 的透视图。图中三个 hash join 算子之间的冗余的 shuffle 被移除后,它们可以在一个 task 里执行,只不过 operator chaining 没法处理这种多输入的情况,将它们放到 multiple input operator 里执行,由 multiple input operator 管理各个算子的输入顺序和算子之间的调用关系。

image.png

图 5 - 经过 multiple input 优化后的算子拓扑图

Multiple input operator 的构建和运行过程较为复杂,对此细节有兴趣的读者可以参考设计文档[2]。

Source 也不能遗漏:Source Chaining

经过 multiple input operator 的优化,我们将图 1 中的执行计划优化为图 6,图 3 经过 operator chaining 优化后就变为图 6 的执行图。

image.png

图 6 - 经过 multiple input operator 优化后的执行计划

图 6 中从 store_sales 表产生的 forward shuffle(如红框所示)表示我们仍有优化空间。正如序言中所说,在大部分作业中,从 source 直接产生的数据由于没有经过 join 等算子的筛选和加工,shuffle 的数据量是最大的。以 10T 数据下的 TPC-DS q96 为例,如果不进行进一步优化,包含 store_sales 源表的 task 将向网络中传输 1.03T 的数据,而经过一次 join 的筛选后,数据量急速下降至 16.5G。如果我们能将源表的 forward shuffle 省去,作业整体执行效率又能前进一大步。

可惜的是,multiple input operator 也不能覆盖 source shuffle 的场景,这是因为 source 不同于其它任何算子,它没有任何输入。Flink 1.12 为此给 operator chaining 新增了 source chaining 功能,将不被 shuffle 阻隔的 source 合并到 operator chaining 中,省去了 source 与下游算子之间的 forward shuffle。

目前仅有 FLIP-27 source 以及 multiple input operator 可以利用 source chaining 功能,不过这已经足够解决本文中的优化场景。

结合 multiple input operator 与 source chaining 之后,图 7 展示了本文优化案例的最终执行方案。

image.png

图 7 - 优化后的执行方案

TPC-DS 测试结果

Multiple input operator 与 source chaining 对大部分作业,特别是批作业有显著的优化效果。我们利用 TPC-DS 测试集对 Flink 1.12 的整体性能进行了测试,与 Flink 1.10 公布的 12267s 总用时相比,Flink 1.12 的总用时仅为 8708s,缩短了近 30% 的运行时间!

image.png

图 8 - TPC-DS 测试集总用时对比

image.png

图 9 - TPC-DS 部分测试点用时对比

未来计划

通过 TPC-DS 的测试效果看到,source chaining + multiple input 能够给我们带来很大的性能提升。目前整体框架已完成,常用批算子已支持消除冗余 exchange 的推导逻辑,后续我们将支持更多的批算子和更精细的推导算法。

流作业的数据 shuffle 虽然不需要像批作业一样将数据写入磁盘,但将网络传输变为内存传输带来的性能提升也是非常可观的,因此流作业支持 source chaining + multiple input 也是一个非常令人期待的优化。同时,在流作业上支持该优化还需要很多工作,例如流算子上消除冗余 exchange 的推导逻辑暂未支持,一些算子需要重构以消除输入数据是 binary 的要求等等,这也是为什么 Flink 1.12 暂未在流作业中推出推出该优化的原因。后续版本我们将逐步完成这些工作,也希望更多社区的力量加入我们一起尽早的将更多的优化落地。

另外,阿里云实时计算团队围绕 Apache Flink 为核心打造的实时大数据平台,在阿里巴巴内部提供全集团范围的流批一体数据分析服务,同时也通过阿里云向外界提供 Flink 企业级云产品,服务广大中小企业。我们的技术团队围绕开源大数据技术体系构建,包括来自 Apache Flink/Hadoop/HBase/Kafka/Hive/Druid 等多个顶级开源项目的众多 PMC/Committer 成员,加入实时计算团队将可以与众多技术大神共同探索大数据技术世界,感兴趣的同学请速联系:kete.yangkt@alibaba-inc.com。

参考链接:

[1]https://cwiki.apache.org/confluence/display/FLINK/FLIP-92%3A+Add+N-Ary+Stream+Operator+in+Flink
[2]https://docs.google.com/document/d/1qKVohV12qn-bM51cBZ8Hcgp31ntwClxjoiNBUOqVHsI/

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2月前
|
SQL 存储 API
Flink实践:通过Flink SQL进行SFTP文件的读写操作
虽然 Apache Flink 与 SFTP 之间的直接交互存在一定的限制,但通过一些创造性的方法和技术,我们仍然可以有效地实现对 SFTP 文件的读写操作。这既展现了 Flink 在处理复杂数据场景中的强大能力,也体现了软件工程中常见的问题解决思路——即通过现有工具和一定的间接方法来克服技术障碍。通过这种方式,Flink SQL 成为了处理各种数据源,包括 SFTP 文件,在内的强大工具。
157 15
|
1月前
|
SQL 监控 Oracle
Oracle SQL性能优化全面指南
在数据库管理领域,Oracle SQL性能优化是确保数据库高效运行和数据查询速度的关键
|
2月前
|
存储 SQL 关系型数据库
【MySQL调优】如何进行MySQL调优?从参数、数据建模、索引、SQL语句等方向,三万字详细解读MySQL的性能优化方案(2024版)
MySQL调优主要分为三个步骤:监控报警、排查慢SQL、MySQL调优。 排查慢SQL:开启慢查询日志 、找出最慢的几条SQL、分析查询计划 。 MySQL调优: 基础优化:缓存优化、硬件优化、参数优化、定期清理垃圾、使用合适的存储引擎、读写分离、分库分表; 表设计优化:数据类型优化、冷热数据分表等。 索引优化:考虑索引失效的11个场景、遵循索引设计原则、连接查询优化、排序优化、深分页查询优化、覆盖索引、索引下推、用普通索引等。 SQL优化。
519 15
【MySQL调优】如何进行MySQL调优?从参数、数据建模、索引、SQL语句等方向,三万字详细解读MySQL的性能优化方案(2024版)
|
1月前
|
SQL 数据挖掘 数据库
SQL查询每秒的数据:技巧、方法与性能优化
id="">SQL查询功能详解 SQL(Structured Query Language,结构化查询语言)是一种专门用于与数据库进行沟通和操作的语言
|
1月前
|
SQL 大数据 API
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
43 0
|
2月前
|
SQL 安全 数据处理
揭秘数据脱敏神器:Flink SQL的神秘力量,守护你的数据宝藏!
【9月更文挑战第7天】在大数据时代,数据管理和处理尤为重要,尤其在保障数据安全与隐私方面。本文探讨如何利用Flink SQL实现数据脱敏,为实时数据处理提供有效的隐私保护方案。数据脱敏涉及在处理、存储或传输前对敏感数据进行加密、遮蔽或替换,以遵守数据保护法规(如GDPR)。Flink SQL通过内置函数和表达式支持这一过程。
71 2
|
2月前
|
SQL 大数据 数据处理
奇迹降临!解锁 Flink SQL 简单高效的终极秘籍,开启数据处理的传奇之旅!
【9月更文挑战第7天】在大数据处理领域,Flink SQL 因其强大功能与简洁语法成为开发者首选。本文分享了编写高效 Flink SQL 的实用技巧:理解数据特征及业务需求;灵活运用窗口函数(如 TUMBLE 和 HOP);优化连接操作,优先采用等值连接;合理选择数据类型以减少计算资源消耗。结合实际案例(如实时电商数据分析),并通过定期性能测试与调优,助力开发者在大数据处理中更得心应手,挖掘更多价值信息。
46 1
|
2月前
|
关系型数据库 MySQL 网络安全
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
|
4月前
|
SQL 存储 监控
SQL Server的并行实施如何优化?
【7月更文挑战第23天】SQL Server的并行实施如何优化?
103 13
|
4月前
|
SQL
解锁 SQL Server 2022的时间序列数据功能
【7月更文挑战第14天】要解锁SQL Server 2022的时间序列数据功能,可使用`generate_series`函数生成整数序列,例如:`SELECT value FROM generate_series(1, 10)。此外,`date_bucket`函数能按指定间隔(如周)对日期时间值分组,这些工具结合窗口函数和其他时间日期函数,能高效处理和分析时间序列数据。更多信息请参考官方文档和技术资料。

相关产品

  • 实时计算 Flink版
  • 下一篇
    无影云桌面