待待深度探索 Flink SQL(二)| 学习笔记

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
简介: 快速学习待待深度探索 Flink SQL。

开发者学堂课程【Apache Flink 入门到实战 - Flink 开源社区出品 待待深度探索 Flink SQL(二)】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/632/detail/10041


待待深度探索 Flink SQL(二)


三.  Blink Planner

1.blink planner 过程

image.png

Table API&SQL 解析验证:

在 Flink 1.9 中,Table API 进行了大量的重构,引入了一套新的 Operation,这套 Operation 主要是用来描述任务的 Logic Tree。

当 SQL 传输进来后,首先会去做 SQL 解析,SQL 解析完成之后,会得到 SqlNode Tree(抽象语法树),然后会紧接着去做 Validate(验证),验证时会去访问 FunctionManger 和 CatalogManger 

FunctionManger 主要是查询用户定义的 UDF,以及检查 UDF 是否合法 

CatalogManger 主要是检查这个 Table 或者 Database 是否存在,如果验证都通过,就会生成一个 Operation DAG(有向无环图)。

从这一步可以看出,Table API 和 SQL 在 Flink 中最终都会转化为统一的结构,即 Operation DAG。 

生成RelNode: 

Operation DAG 会被转化为 RelNode(关系表达式) DAG。

优化:

优化器会对 RelNode 做各种优化,优化器的输入是各种优化的规则,以及各种统计信息。 

当前,在 Blink Planner 里面,绝大部分的优化规则,Stream 和 Batch 是共享的。

差异在于,对 Batch 而言,它没有 state 的概念,而对于 Stream 而言,它是不支持 sort 的,所以目前 Blink Planner 中,还是运行了两套独立的规则集(Rule Set),然后定义了两套独立的 Physical Rel:BatchPhysical Rel 和 StreamPhysical Rel。

优化器优化的结果,就是具体的 Physical Rel DAG。

转化:

得到 Physical Rel Dag 后,继续会转化为 ExecNode,通过名字可以看出,ExecNode 已经属于执行层的概念了,但是这个执行层是 Blink 的执行层,在 ExecNode 中,会进行大量的 CodeGen 的操作,还有非 Code 的 Operator 操作

最后,将 ExecNode 转化为 Transformation DAG。

生成可执行 Job Graph:

得到 Transformation DAG 后,最终会被转化成 Job Graph,完成 SQL 或者 Table API 的解析。 

通过这个图,可以看出SQL/Table API如何在flink中流转,最终变为可执行的Graph。从上图可以很清楚的看到,解析的过程涉及到了三层:Table API/SQL,Blink Planner,Runtime

2. Blink Planner 改进及优化

Blink Planner 功能方面改进主要包含如下几个方面:

l 更完整的 SQL 语法支持:例如,IN,EXISTS,NOT EXISTS,subquery,完整的 Over 语句,Group Sets 等。而且已经跑通了所有的 TPCH,TPCDS 这两个测试集,性能还非常不错。

l 提供了更丰富,高效的算子。

l 提供了非常完善的 cost 模型,同时能够对接 Catalog 中的统计信息,使 cost 根据统计信息得到更优的执行plan

l 支持 join reorder。

l shuffle service:对 Batch 而言,Blink Planner 还支持 shuffle service,这对 Batch 作业的稳定性有非常大的帮助

如果遇到 Batch 作业失败,通过 shuffle service 能够很快的进行恢复。 

性能方面,主要包括以下部分:

l 分段优化。

l Sub-Plan Reuse。

l 更丰富的优化 Rule:共一百多个 Rule ,并且绝大多数 Rule 是 Stream 和 Batch 共享的。

l 更高效的数据结构 BinaryRow:能够节省序列化和反序列化的操作。

l mini-batch 支持(仅 Stream):节省 state 的访问的操作。

l 节省多余的 Shuffle 和 Sort(Batch 模式):两个算子之间,如果已经按 A 做 Shuffle,紧接着他下的下游也是需要按 A Shuffle 的数据,那中间的这一层 Shuffle,就可以省略,这样就可以省很多网络的开销,Sort 的情况也是类似。

Sort 和 Shuffle 如果在整个计算里面是占大头,对整个性能是有很大的提升的。

l 深入性能优化及实践

示例 5

create view MyView as select word, count(1) as freq from SourceTable group by word;

insert into SinkTable1 select * from MyView where freq >10;

insert into SinkTable2 select count(word) as freq2, freq from MyView group by freq;

首先创建一个 view,view 执行的是 word count 的例子,从 source 表读数据,然后 group by 每一个 word,看这个word的词频,把刚才的 view 做一个 filter,结果写到sink表1。

对每个词频做排序,相同词频下,word 出现的次数写到 sink 表2。

上面的这几个 SQL,转化为 RelNode DAG,大致图形如下:

 image.png

解析:

最下层的 Scan 是扫描 source 表,然后 group by一个word 做 count1。

左边的一个分支对应的是 sink 1。先是做了一个 filter,然后结果写到 sink 1里面。

右面的分支是得出的结果按 freq 求 group by,然后求出 word count 的结果,写到sink 2。

 image.png

但是在 flink planner 里面(上图),每一个 sink 做优化的时候都是单独进行的,即它是执行一个 query 的时候,flink 就要做一次优化。

在执行 sink 2 的时候,也做一个优化。

可以看到,old planner 只是简单的从 Sink 出发,反向的遍历到 Source,从而形成两个独立的执行链路,从上图也可以清楚的看到,Scan 和第一层 Aggregate 是有重复计算的。

在 Blink Planner 中,经过优化层之后,会生成如下执行层的 DAG:

image.png

Blink Planner 不是在每次调用 insert into 的时候就开始优化,而是先将所有的 insert into 操作缓存起来,等到执行前才进行优化,这样就可以看到完整的执行图,可以知道哪些部分是重复计算的。

Blink Planner 通过寻找可以优化的最大公共子图,找到这些重复计算的部分。

经过优化后,Blink Planner 会将最大公共子图的部分当做一个临时表,供其他部分直接使用。

这样,上面的图可以分为三部分

最大公共子图部分(临时表)

临时表与 Filter 和 SinkTable1 优化

临时表与第二个 Aggregate 和 SinkTable 2 优化

Blink Planner 其实是通过声明的 View 找到最大公共子图的,因此在开发过程中,如果需要复用某段逻辑,就将其定义为 View,这样就可以充分利用 Blink Planner 的分段优化功能,减少重复计算。

当然,当前的优化也不是最完美的,因为提前对图进行了切割,可能会导致一些优化丢失,今后会持续地对这部分算法进行改进。 

总结一下,Blink Planner 的分段优化,其实解的是多 Sink 优化问题(DAG 优化),单 Sink 不是分段优化关心的问题,单 Sink 可以在所有节点上优化,不需要分段。

l Sub-Plan Reuse

insert into SinkTable

select freq from (select word, count(1) as freq from SourceTable group by word) t where word like 'T%'

union all

select count(word) as freq2 from (select word, count(1) as freq from SourceTable group by word) t group by freq;

这个示例的 SQL 和分段优化的 SQL 其实是类似的,不同的是,没有将结果 Sink 到两个 Table 里面,而是将结果 Union 起来,Sink 到一个结果表里面。

下面看一下转化为 RelNode 的 DAG 图:

image.png

从上图可以看出,Scan 和第一层的 Aggregate 也是有重复计算的,Blink Planner 其实也会将其找出来,变成下面的图:

image.png 

Sub-Plan 优化的启用,有两个相关的配置:

· table.optimizer.reuse-sub-plan-enabled (默认开启)

· table.optimizer.reuse-source-enabled(默认开启)

这两个配置,默认都是开启的,用户可以根据自己的需求进行关闭。

这里主要说明一下 table.optimizer.reuse-source-enabled 这个参数。 

在 Batch 模式下,join 操作可能会导致死锁,具体场景是在执行 hash-join 或者 nested-loop-join 时一定是先读 build 端,然后再读 probe 端,如果启用 reuse-source-enabled,当数据源是同一个 Source 的时候,Source 的数据会同时发送给 build 和 probe 端。

这时候,build 端的数据将不会被消费,导致 join 操作无法完成,整个 join 就被卡住了。

为了解决死锁问题,Blink Planner 会先将 probe 端的数据落盘,这样 build 端读数据的操作才会正常,等 build 端的数据全部读完之后,再从磁盘中拉取 probe 端的数据,从而解决死锁问题。

但是,落盘会有额外的开销,会多一次写的操作;有时候,读两次 Source 的开销,可能比一次写的操作更快,这时候,可以关闭 reuse-source,性能会更好。 

当然,如果读两次 Source 的开销,远大于一次落盘的开销,可以保持 reuse-source 开启。

需要说明的是,Stream 模式是不存在死锁问题的,因为 Stream 模式 join 不会有选边的问题。

总结而言,sub-plan reuse 解的问题是优化结果的子图复用问题,它和分段优化类似,但他们是一个互补的过程。

■ Agg 分类优化

Blink 中的 Aggregate 有以下四类:

· group agg

例如:select count(a) from t group by b

为了同其他agg进行区分,query是最常见的group by语句。

· over agg

例如:select count(a) over (partition by b order by c) from t

有 over 的关键字。

· window agg

例如:select count(a) from t group by tumble(ts, interval '10' second), b

有常见的 window 属性。

· table agg

· 例如

tEnv.scan('t').groupBy('a').flatAggregate(flatAggFunc('b' as ('c', 'd')))

只能在 table API 里面引入,允许用户自定义 agg 行为。不像 group agg 多条进,一条出。它可以允许用户多条进多条出。

l Local/Global Agg

主要是为了减少网络 Shuffle 数据。

对于 agg function 可以做 merge 这样的 agg 可以用 local globle 来解。

sum(a)->local sum (a)+globle sum (local result)

Globle sum 是输入式 local sum 的结果。

Distinct Agg 进行优化,主要是对 SQL 语句进行改写,达到优化的目的。

但 Batch 模式和 Stream 模式解决的问题是不同的:

· Stream 模式下,主要是解决热点问题,因为 Stream 需要将所有的输入数据放在 State 里面,如果数据有热点,State 操作会很频繁,这将影响性能。

· Batch 模式下的 distinct Agg,需要先做 distinct,再做 agg,逻辑上需要两步才能实现,直接实现 Distinct Agg 开销太大。 

流里面为什么不需要分开?因为流里面有 state 。在访问 state 的时候天然的可以通过 key 去做一层 distinct。在流里面到 distinct agg 里面来做,但是 batch 里面是不能做到的。

要运用 Local/Global 的优化,必要条件如下:

Aggregate 的所有 Agg Function 都是 mergeable 的,每个 Aggregate 需要实现 merge 方法,例如 SUM,COUNT,AVG,这些都是可以分多阶段完成,最终将结果合并;但是求中位数,计算 95% 这种类似的问题,无法拆分为多阶段,因此,无法运用 Local/Global 的优化。

table.optimizer.agg-phase-strategy 设置为 AUTO 或者 TWO_PHASE。AUTO表示不强制指定一阶段或二阶段,而是让框架来决定。

Stream 模式下,mini-batch 开启 ;Batch 模式下 AUTO 会根据 cost 模型加上统计数据,选择是否进行 Local/Global 优化。绝大多数情况下会选用local globle优化。

看下面一个范例

Query:

select count(*) from t group by color 

没有优化的情况下,下面的这个 Aggregate 会产生 10 次的 Shuffle 操作。

image.png

使用 Local/Global 优化后,加了一层,这时候 local 算子会和前面的算子在本地先进行聚合,然后再进行 Shuffle 操作 

整个 Shuffle 的数据剩下 6 条。在 Stream 模式下,Blink 其实会以 mini-batch 的维度对结果进行预聚合,然后将结果发送给 Global Agg 进行汇总。

为什么在流里面开启mini-batch,是因为local agg的聚合是以mini-batch为单位去做的,会去对mini-batch中的所有数据做一次提前的聚合,然后把聚合的结果发送给最终的globle agg。

l distinct agg

Batch下,强制改写

第一层求 distinct 值和非 distinct agg function 的值,第二层 层求 distinct agg function 的值

select color, count(distinct id), count(*) from t group by color

手工改写成:

select color, count(id), min(cnt) from ( select color, id, count(*) filter (where $e=2) as cnt from (

select color, id, 1 as $e from t --for distinct id union all

select color, null as id, 2 as $e from t -- for count(*)

) group by color, id, $e ) group by color

基本思路是:count distinct 和普通的 count 是没法同时计算的。先做 distinct。

这里改写的思路是一份数据分成两部分。一部分是给 distinct 用,另外一部分给count* 用。

用红色标出来的:第一部分:Select color id as $e。是为了给 count distinct 用的还是给普通的 count*用的,$e1 是给 distinct id 用的。

第二部分:$e2 表示给 count* 用的。因为在 count* 这边是  group by color 。所以说对第二位的 id 统一把它置为一个同样的值。这时候才能保证结果的正确性。

再来看 union 后的结果,把它改写成一个普通的 query,求 count*,但是 count*是有条件的,只允许统计下面这一条数据做 count*。下面一层对这个结果选择直接输出的 agg 例如 min 或者 max 都可以。

对于 count distinct,因为已经在上层做过了,所以在这一层只需要做 count id 即可。

红字部分用 expand 算子表示。

接下来看整条数据的图:

image.png

Stream 下,必要条件 

必须是支持的agg

function:avg/count/min/max/su um/first_value/last_value/ concat_agg/single_value

table.optimizer.distinct-agg.split.enabled开启(默认关闭)

select color,count(distinct id),count(*)

from t group by color

手工改写为:

select color, sum(dcnt), sum(cnt) from ( select color, count(distinct id) as dcnt, count(*) as cnt from t group by color, mod(hash_code(id), 1024) ) group by color

基本思路是:将 count distinct  的 id 做一个 Shuffle,这样能够把原始的 Shuffle 的数据做一个打散。

改写前:

 image.png 

上层数据是输入的数据。白色部分比较多,白色部分为热点。所有白色的热点有可能落到同一个agg里面,会导致这一agg会出现热点。

改写后,逻辑图就会变为下面这样,热点数据被打散到多个中间节点上

image.png 

需要注意的是,示例 5 的 SQL 中 mod(hash_code(id),1024)中的这个 1024 为打散的维度,这个值建议保持原样或者设置大一些,设置太小产生的效果可能不好。

 

四.Q&A

配置文件哪里指定 blink planner 或 flink planner?

答:目前还没发通过配置文件指定。应该通过代码显示指定或者在environment city 里面不指定,而是提供的 flink jar 或者 blink 的 jar 指定。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2月前
|
SQL 存储 API
Flink实践:通过Flink SQL进行SFTP文件的读写操作
虽然 Apache Flink 与 SFTP 之间的直接交互存在一定的限制,但通过一些创造性的方法和技术,我们仍然可以有效地实现对 SFTP 文件的读写操作。这既展现了 Flink 在处理复杂数据场景中的强大能力,也体现了软件工程中常见的问题解决思路——即通过现有工具和一定的间接方法来克服技术障碍。通过这种方式,Flink SQL 成为了处理各种数据源,包括 SFTP 文件,在内的强大工具。
157 15
|
1月前
|
SQL 大数据 API
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
43 0
|
2月前
|
SQL 安全 数据处理
揭秘数据脱敏神器:Flink SQL的神秘力量,守护你的数据宝藏!
【9月更文挑战第7天】在大数据时代,数据管理和处理尤为重要,尤其在保障数据安全与隐私方面。本文探讨如何利用Flink SQL实现数据脱敏,为实时数据处理提供有效的隐私保护方案。数据脱敏涉及在处理、存储或传输前对敏感数据进行加密、遮蔽或替换,以遵守数据保护法规(如GDPR)。Flink SQL通过内置函数和表达式支持这一过程。
71 2
|
2月前
|
SQL 大数据 数据处理
奇迹降临!解锁 Flink SQL 简单高效的终极秘籍,开启数据处理的传奇之旅!
【9月更文挑战第7天】在大数据处理领域,Flink SQL 因其强大功能与简洁语法成为开发者首选。本文分享了编写高效 Flink SQL 的实用技巧:理解数据特征及业务需求;灵活运用窗口函数(如 TUMBLE 和 HOP);优化连接操作,优先采用等值连接;合理选择数据类型以减少计算资源消耗。结合实际案例(如实时电商数据分析),并通过定期性能测试与调优,助力开发者在大数据处理中更得心应手,挖掘更多价值信息。
46 1
|
6月前
|
SQL NoSQL Java
Flink SQL 问题之执行报错如何解决
Flink SQL报错通常指在使用Apache Flink的SQL接口执行数据处理任务时遇到的问题;本合集将收集常见的Flink SQL报错情况及其解决方法,帮助用户迅速恢复数据处理流程。
583 2
|
6月前
|
SQL Java 关系型数据库
Flink SQL 问题之用代码执行报错如何解决
Flink SQL报错通常指在使用Apache Flink的SQL接口执行数据处理任务时遇到的问题;本合集将收集常见的Flink SQL报错情况及其解决方法,帮助用户迅速恢复数据处理流程。
681 6
|
6月前
|
SQL 消息中间件 Oracle
Flink SQL 问题之写入ES报错如何解决
Flink SQL报错通常指在使用Apache Flink的SQL接口执行数据处理任务时遇到的问题;本合集将收集常见的Flink SQL报错情况及其解决方法,帮助用户迅速恢复数据处理流程。
103 4
|
6月前
|
SQL JSON Java
Flink SQL 问题之重启报错如何解决
Flink SQL报错通常指在使用Apache Flink的SQL接口执行数据处理任务时遇到的问题;本合集将收集常见的Flink SQL报错情况及其解决方法,帮助用户迅速恢复数据处理流程。
130 3
|
6月前
|
SQL 资源调度 分布式数据库
Flink SQL 问题之服务器报错如何解决
Flink SQL报错通常指在使用Apache Flink的SQL接口执行数据处理任务时遇到的问题;本合集将收集常见的Flink SQL报错情况及其解决方法,帮助用户迅速恢复数据处理流程。
124 3

热门文章

最新文章

下一篇
无影云桌面