待待深度探索 Flink SQL(二)| 学习笔记

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云解析DNS,个人版 1个月
全局流量管理 GTM,标准版 1个月
简介: 快速学习待待深度探索 Flink SQL。

开发者学堂课程【Apache Flink 入门到实战 - Flink 开源社区出品 待待深度探索 Flink SQL(二)】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/632/detail/10041


待待深度探索 Flink SQL(二)


三.  Blink Planner

1.blink planner 过程

image.png

Table API&SQL 解析验证:

在 Flink 1.9 中,Table API 进行了大量的重构,引入了一套新的 Operation,这套 Operation 主要是用来描述任务的 Logic Tree。

当 SQL 传输进来后,首先会去做 SQL 解析,SQL 解析完成之后,会得到 SqlNode Tree(抽象语法树),然后会紧接着去做 Validate(验证),验证时会去访问 FunctionManger 和 CatalogManger 

FunctionManger 主要是查询用户定义的 UDF,以及检查 UDF 是否合法 

CatalogManger 主要是检查这个 Table 或者 Database 是否存在,如果验证都通过,就会生成一个 Operation DAG(有向无环图)。

从这一步可以看出,Table API 和 SQL 在 Flink 中最终都会转化为统一的结构,即 Operation DAG。 

生成RelNode: 

Operation DAG 会被转化为 RelNode(关系表达式) DAG。

优化:

优化器会对 RelNode 做各种优化,优化器的输入是各种优化的规则,以及各种统计信息。 

当前,在 Blink Planner 里面,绝大部分的优化规则,Stream 和 Batch 是共享的。

差异在于,对 Batch 而言,它没有 state 的概念,而对于 Stream 而言,它是不支持 sort 的,所以目前 Blink Planner 中,还是运行了两套独立的规则集(Rule Set),然后定义了两套独立的 Physical Rel:BatchPhysical Rel 和 StreamPhysical Rel。

优化器优化的结果,就是具体的 Physical Rel DAG。

转化:

得到 Physical Rel Dag 后,继续会转化为 ExecNode,通过名字可以看出,ExecNode 已经属于执行层的概念了,但是这个执行层是 Blink 的执行层,在 ExecNode 中,会进行大量的 CodeGen 的操作,还有非 Code 的 Operator 操作

最后,将 ExecNode 转化为 Transformation DAG。

生成可执行 Job Graph:

得到 Transformation DAG 后,最终会被转化成 Job Graph,完成 SQL 或者 Table API 的解析。 

通过这个图,可以看出SQL/Table API如何在flink中流转,最终变为可执行的Graph。从上图可以很清楚的看到,解析的过程涉及到了三层:Table API/SQL,Blink Planner,Runtime

2. Blink Planner 改进及优化

Blink Planner 功能方面改进主要包含如下几个方面:

l 更完整的 SQL 语法支持:例如,IN,EXISTS,NOT EXISTS,subquery,完整的 Over 语句,Group Sets 等。而且已经跑通了所有的 TPCH,TPCDS 这两个测试集,性能还非常不错。

l 提供了更丰富,高效的算子。

l 提供了非常完善的 cost 模型,同时能够对接 Catalog 中的统计信息,使 cost 根据统计信息得到更优的执行plan

l 支持 join reorder。

l shuffle service:对 Batch 而言,Blink Planner 还支持 shuffle service,这对 Batch 作业的稳定性有非常大的帮助

如果遇到 Batch 作业失败,通过 shuffle service 能够很快的进行恢复。 

性能方面,主要包括以下部分:

l 分段优化。

l Sub-Plan Reuse。

l 更丰富的优化 Rule:共一百多个 Rule ,并且绝大多数 Rule 是 Stream 和 Batch 共享的。

l 更高效的数据结构 BinaryRow:能够节省序列化和反序列化的操作。

l mini-batch 支持(仅 Stream):节省 state 的访问的操作。

l 节省多余的 Shuffle 和 Sort(Batch 模式):两个算子之间,如果已经按 A 做 Shuffle,紧接着他下的下游也是需要按 A Shuffle 的数据,那中间的这一层 Shuffle,就可以省略,这样就可以省很多网络的开销,Sort 的情况也是类似。

Sort 和 Shuffle 如果在整个计算里面是占大头,对整个性能是有很大的提升的。

l 深入性能优化及实践

示例 5

create view MyView as select word, count(1) as freq from SourceTable group by word;

insert into SinkTable1 select * from MyView where freq >10;

insert into SinkTable2 select count(word) as freq2, freq from MyView group by freq;

首先创建一个 view,view 执行的是 word count 的例子,从 source 表读数据,然后 group by 每一个 word,看这个word的词频,把刚才的 view 做一个 filter,结果写到sink表1。

对每个词频做排序,相同词频下,word 出现的次数写到 sink 表2。

上面的这几个 SQL,转化为 RelNode DAG,大致图形如下:

 image.png

解析:

最下层的 Scan 是扫描 source 表,然后 group by一个word 做 count1。

左边的一个分支对应的是 sink 1。先是做了一个 filter,然后结果写到 sink 1里面。

右面的分支是得出的结果按 freq 求 group by,然后求出 word count 的结果,写到sink 2。

 image.png

但是在 flink planner 里面(上图),每一个 sink 做优化的时候都是单独进行的,即它是执行一个 query 的时候,flink 就要做一次优化。

在执行 sink 2 的时候,也做一个优化。

可以看到,old planner 只是简单的从 Sink 出发,反向的遍历到 Source,从而形成两个独立的执行链路,从上图也可以清楚的看到,Scan 和第一层 Aggregate 是有重复计算的。

在 Blink Planner 中,经过优化层之后,会生成如下执行层的 DAG:

image.png

Blink Planner 不是在每次调用 insert into 的时候就开始优化,而是先将所有的 insert into 操作缓存起来,等到执行前才进行优化,这样就可以看到完整的执行图,可以知道哪些部分是重复计算的。

Blink Planner 通过寻找可以优化的最大公共子图,找到这些重复计算的部分。

经过优化后,Blink Planner 会将最大公共子图的部分当做一个临时表,供其他部分直接使用。

这样,上面的图可以分为三部分

最大公共子图部分(临时表)

临时表与 Filter 和 SinkTable1 优化

临时表与第二个 Aggregate 和 SinkTable 2 优化

Blink Planner 其实是通过声明的 View 找到最大公共子图的,因此在开发过程中,如果需要复用某段逻辑,就将其定义为 View,这样就可以充分利用 Blink Planner 的分段优化功能,减少重复计算。

当然,当前的优化也不是最完美的,因为提前对图进行了切割,可能会导致一些优化丢失,今后会持续地对这部分算法进行改进。 

总结一下,Blink Planner 的分段优化,其实解的是多 Sink 优化问题(DAG 优化),单 Sink 不是分段优化关心的问题,单 Sink 可以在所有节点上优化,不需要分段。

l Sub-Plan Reuse

insert into SinkTable

select freq from (select word, count(1) as freq from SourceTable group by word) t where word like 'T%'

union all

select count(word) as freq2 from (select word, count(1) as freq from SourceTable group by word) t group by freq;

这个示例的 SQL 和分段优化的 SQL 其实是类似的,不同的是,没有将结果 Sink 到两个 Table 里面,而是将结果 Union 起来,Sink 到一个结果表里面。

下面看一下转化为 RelNode 的 DAG 图:

image.png

从上图可以看出,Scan 和第一层的 Aggregate 也是有重复计算的,Blink Planner 其实也会将其找出来,变成下面的图:

image.png 

Sub-Plan 优化的启用,有两个相关的配置:

· table.optimizer.reuse-sub-plan-enabled (默认开启)

· table.optimizer.reuse-source-enabled(默认开启)

这两个配置,默认都是开启的,用户可以根据自己的需求进行关闭。

这里主要说明一下 table.optimizer.reuse-source-enabled 这个参数。 

在 Batch 模式下,join 操作可能会导致死锁,具体场景是在执行 hash-join 或者 nested-loop-join 时一定是先读 build 端,然后再读 probe 端,如果启用 reuse-source-enabled,当数据源是同一个 Source 的时候,Source 的数据会同时发送给 build 和 probe 端。

这时候,build 端的数据将不会被消费,导致 join 操作无法完成,整个 join 就被卡住了。

为了解决死锁问题,Blink Planner 会先将 probe 端的数据落盘,这样 build 端读数据的操作才会正常,等 build 端的数据全部读完之后,再从磁盘中拉取 probe 端的数据,从而解决死锁问题。

但是,落盘会有额外的开销,会多一次写的操作;有时候,读两次 Source 的开销,可能比一次写的操作更快,这时候,可以关闭 reuse-source,性能会更好。 

当然,如果读两次 Source 的开销,远大于一次落盘的开销,可以保持 reuse-source 开启。

需要说明的是,Stream 模式是不存在死锁问题的,因为 Stream 模式 join 不会有选边的问题。

总结而言,sub-plan reuse 解的问题是优化结果的子图复用问题,它和分段优化类似,但他们是一个互补的过程。

■ Agg 分类优化

Blink 中的 Aggregate 有以下四类:

· group agg

例如:select count(a) from t group by b

为了同其他agg进行区分,query是最常见的group by语句。

· over agg

例如:select count(a) over (partition by b order by c) from t

有 over 的关键字。

· window agg

例如:select count(a) from t group by tumble(ts, interval '10' second), b

有常见的 window 属性。

· table agg

· 例如

tEnv.scan('t').groupBy('a').flatAggregate(flatAggFunc('b' as ('c', 'd')))

只能在 table API 里面引入,允许用户自定义 agg 行为。不像 group agg 多条进,一条出。它可以允许用户多条进多条出。

l Local/Global Agg

主要是为了减少网络 Shuffle 数据。

对于 agg function 可以做 merge 这样的 agg 可以用 local globle 来解。

sum(a)->local sum (a)+globle sum (local result)

Globle sum 是输入式 local sum 的结果。

Distinct Agg 进行优化,主要是对 SQL 语句进行改写,达到优化的目的。

但 Batch 模式和 Stream 模式解决的问题是不同的:

· Stream 模式下,主要是解决热点问题,因为 Stream 需要将所有的输入数据放在 State 里面,如果数据有热点,State 操作会很频繁,这将影响性能。

· Batch 模式下的 distinct Agg,需要先做 distinct,再做 agg,逻辑上需要两步才能实现,直接实现 Distinct Agg 开销太大。 

流里面为什么不需要分开?因为流里面有 state 。在访问 state 的时候天然的可以通过 key 去做一层 distinct。在流里面到 distinct agg 里面来做,但是 batch 里面是不能做到的。

要运用 Local/Global 的优化,必要条件如下:

Aggregate 的所有 Agg Function 都是 mergeable 的,每个 Aggregate 需要实现 merge 方法,例如 SUM,COUNT,AVG,这些都是可以分多阶段完成,最终将结果合并;但是求中位数,计算 95% 这种类似的问题,无法拆分为多阶段,因此,无法运用 Local/Global 的优化。

table.optimizer.agg-phase-strategy 设置为 AUTO 或者 TWO_PHASE。AUTO表示不强制指定一阶段或二阶段,而是让框架来决定。

Stream 模式下,mini-batch 开启 ;Batch 模式下 AUTO 会根据 cost 模型加上统计数据,选择是否进行 Local/Global 优化。绝大多数情况下会选用local globle优化。

看下面一个范例

Query:

select count(*) from t group by color 

没有优化的情况下,下面的这个 Aggregate 会产生 10 次的 Shuffle 操作。

image.png

使用 Local/Global 优化后,加了一层,这时候 local 算子会和前面的算子在本地先进行聚合,然后再进行 Shuffle 操作 

整个 Shuffle 的数据剩下 6 条。在 Stream 模式下,Blink 其实会以 mini-batch 的维度对结果进行预聚合,然后将结果发送给 Global Agg 进行汇总。

为什么在流里面开启mini-batch,是因为local agg的聚合是以mini-batch为单位去做的,会去对mini-batch中的所有数据做一次提前的聚合,然后把聚合的结果发送给最终的globle agg。

l distinct agg

Batch下,强制改写

第一层求 distinct 值和非 distinct agg function 的值,第二层 层求 distinct agg function 的值

select color, count(distinct id), count(*) from t group by color

手工改写成:

select color, count(id), min(cnt) from ( select color, id, count(*) filter (where $e=2) as cnt from (

select color, id, 1 as $e from t --for distinct id union all

select color, null as id, 2 as $e from t -- for count(*)

) group by color, id, $e ) group by color

基本思路是:count distinct 和普通的 count 是没法同时计算的。先做 distinct。

这里改写的思路是一份数据分成两部分。一部分是给 distinct 用,另外一部分给count* 用。

用红色标出来的:第一部分:Select color id as $e。是为了给 count distinct 用的还是给普通的 count*用的,$e1 是给 distinct id 用的。

第二部分:$e2 表示给 count* 用的。因为在 count* 这边是  group by color 。所以说对第二位的 id 统一把它置为一个同样的值。这时候才能保证结果的正确性。

再来看 union 后的结果,把它改写成一个普通的 query,求 count*,但是 count*是有条件的,只允许统计下面这一条数据做 count*。下面一层对这个结果选择直接输出的 agg 例如 min 或者 max 都可以。

对于 count distinct,因为已经在上层做过了,所以在这一层只需要做 count id 即可。

红字部分用 expand 算子表示。

接下来看整条数据的图:

image.png

Stream 下,必要条件 

必须是支持的agg

function:avg/count/min/max/su um/first_value/last_value/ concat_agg/single_value

table.optimizer.distinct-agg.split.enabled开启(默认关闭)

select color,count(distinct id),count(*)

from t group by color

手工改写为:

select color, sum(dcnt), sum(cnt) from ( select color, count(distinct id) as dcnt, count(*) as cnt from t group by color, mod(hash_code(id), 1024) ) group by color

基本思路是:将 count distinct  的 id 做一个 Shuffle,这样能够把原始的 Shuffle 的数据做一个打散。

改写前:

 image.png 

上层数据是输入的数据。白色部分比较多,白色部分为热点。所有白色的热点有可能落到同一个agg里面,会导致这一agg会出现热点。

改写后,逻辑图就会变为下面这样,热点数据被打散到多个中间节点上

image.png 

需要注意的是,示例 5 的 SQL 中 mod(hash_code(id),1024)中的这个 1024 为打散的维度,这个值建议保持原样或者设置大一些,设置太小产生的效果可能不好。

 

四.Q&A

配置文件哪里指定 blink planner 或 flink planner?

答:目前还没发通过配置文件指定。应该通过代码显示指定或者在environment city 里面不指定,而是提供的 flink jar 或者 blink 的 jar 指定。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
10天前
|
SQL 关系型数据库 数据库
实时计算 Flink版操作报错合集之在本地执行代码没有问题,但是在线执行sql命令就会报错,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
10天前
|
SQL 关系型数据库 MySQL
实时计算 Flink版操作报错合集之在执行SQL语句时遇到了类找不到,该怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
20天前
|
SQL 存储 API
Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】(5)
Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】
|
20天前
|
SQL 消息中间件 Java
Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】(4)
Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】
|
20天前
|
SQL Java API
Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】(3)
Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】
|
9天前
|
SQL IDE Java
Java连接SQL Server数据库的详细操作流程
Java连接SQL Server数据库的详细操作流程
|
17天前
|
SQL DataWorks NoSQL
DataWorks产品使用合集之如何将SQL Server中的数据转存到MongoDB
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
244 1
|
1月前
|
SQL API 流计算
实时计算 Flink版产品使用合集之在Mac M1下的Docker环境中开启SQL Server代理的操作步骤是什么
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
162 1
|
2天前
|
SQL 关系型数据库 分布式数据库
PolarDB产品使用问题之如何迁移SQL Server
PolarDB产品使用合集涵盖了从创建与管理、数据管理、性能优化与诊断、安全与合规到生态与集成、运维与支持等全方位的功能和服务,旨在帮助企业轻松构建高可用、高性能且易于管理的数据库环境,满足不同业务场景的需求。用户可以通过阿里云控制台、API、SDK等方式便捷地使用这些功能,实现数据库的高效运维与持续优化。
|
19小时前
|
SQL 存储 机器人
SQL Server 中 RAISERROR 的用法详解
SQL Server 中 RAISERROR 的用法详解