Flinksql性能优化

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flinksql性能优化

1 设置空闲状态保留时间

2 开启MiniBatch

3 开启LocalGlobal

4 开启Split Distinct

5 多维DISTINCT 使用Filter

6 设置参数总结

设置空闲状态保留时间

Flink SQL新手有可能犯的错误,其中之一就是 忘记设置空闲状态保留时间导致状态爆炸。列举两个场景:

  • FlinkSQL 的 regu lar join inner 、 left 、 right ),左右表的数据都会 一直保 存在
    状态里,不会清理!要么设置 TTL ,要么使用 Flink SQL 的 interval join 。
  • 使用 Top N 语法进行去重,重复数据的出现一般都位于特定区间内(例如一小时
    或一天内),过了这段时间之后,对应的状态就不再需要了。

Flink SQL可以指定空闲状态(即未更新的状态)被保留的最小时间 当状态中某个 key

对应的 状态 未更新的时间达到阈值时,该条状态被自动清理。

#API 指定
tableEnv.getConfig().setIdleStateRetention(Duration.ofHours(1));
参数指定
Configuration configuration = tableEnv.getConfig().getConfiguration();
configuration.setString("table.exec.state.ttl", " 1 h" )

开启MiniBatch

 

MiniBatch是微批处理,原理是 缓存一定的数据后再触发处理,以减少对 State 的访问从而提升吞吐并减少数据的输出量。MiniBatch主要依靠在每个Task上注册的Timer线程来触发微批,需要消耗一定的线程调度性能。

60a6bcefe26f4b118e50f46e4d0afd1d.png

  • MiniBatch 默认关闭,开启方式如下
// 初始化 table environment
TableEnvironment tEnv = ...
// 获取 tableEnv 的配置对象
Configuration configuration = tEnv.getConfig().getConfiguration();
// 设置参数:
开启 miniBatch
configuration.setString(" table.exec.mini batch.enabled ", true);
批量输出的间隔时间
configuration.setString(" table.exec.mini batch.allow latency ", 5 s);
防止 OOM 设置每个批次最多缓存数据的条数 ,可以设为 2 万条
configuration.setString(" table.exec.mini batch.size ", 20000);
  • 适用场景

微批处理通过增加延迟换取高吞吐,如果有超低延迟的要求,不建议开启微批处理。通

常对于聚合的场景,微批处理可以显著的提升系统性能,建议开启。

60a6bcefe26f4b118e50f46e4d0afd1d.png

注意事项:

1、目前, key value 配置项仅被 Blink planner 支持。

2、1.12 之前的版本有 bug ,开启 miniBatch ,不会清理过期状态,也就是说如果设置状态的 TTL ,无法清理过期状态。1.12 版本才修复这个问题 。参考ISSUE:https://issues.apache.org/jira/browse/FLINK 17096

开启 LocalGlobal

 

LocalGlobal优化将原先的 Aggregate 分成 Local+Global 两阶段聚合,即MapReduce 模型中的 Combine+Reduce 处理模式。第一阶段在上游节点本地攒一批数据进行聚合( localAgg ),并输出这次微批的增量值 A ccumulator )。第二阶段再将收到的 Accumulator 合并( Merge ),得到最终的结果 GlobalAgg )


LocalGlobal本质上能够靠 LocalAgg 的聚合筛除部分倾斜数据,从而降低 GlobalAgg的热点,提升性能。结合下图理解 LocalGlobal 如何解决数据倾斜的问题。

60a6bcefe26f4b118e50f46e4d0afd1d.png

由上图可知:

  • 未开启 LocalGlobal 优化,由于流中的数据倾斜, Key 为红色的聚合算子实例需要处理更多的记录,这就导致了热点问题。
  • 开启 LocalGlobal 优化后,先进行本地聚合,再进行全局聚合。可大大减少 GlobalAgg的热点,提高性能。

LocalGlobal 开启方式:

1)LocalGlobal 优化需要先开启 MiniBatch ,依赖于 MiniBatch 的参数。

2)table.optimizer.agg phase strategy : 聚合策略。默认 AUTO ,支持参数 AUTO 、TWO_PHASE( 使用 LocalGlobal 两阶段聚合 、 ONE_PHASE( 仅使用 Global 一阶段聚合)。

// 初始化 table environment
TableEnvironment tEnv = ...
// 获取 tableEnv 的配置对象
Configuration configuration = tEnv.getConfig().getConfiguration();
// 设置参数:
开启 miniBatch
configuration.setString("table.exec.mini batch.enabled ", true);
批量输出的间隔时间
configuration.setString("table.exec.mini batch.allow latency ", "5s");
防止 OOM 设置每个批次最多缓存数据的条数 ,可以设为 2 万条
configuration.setString("table.exec.mini batch.size ", "20000");
// 开启 LocalGlobal
config uration.setString("table.optimizer.agg phase strategy ", "TWO_PHASE");

注意事项:
1)需要先开启 MiniBatch
2)开启 LocalGlobal 需要 UDAF 实现 Merge 方法 。

提交案例:统计每天每个 mid 出现次数

60a6bcefe26f4b118e50f46e4d0afd1d.png

可以看到存在数据倾斜。

提交案例:开启 miniBatch 和 LocalGlobal

60a6bcefe26f4b118e50f46e4d0afd1d.png

从WebUI 可以看到分组聚合变成了 Local 和 Global 两部分, 数据相对均匀,且没有数据倾斜。

开启Split Distinct

 

LocalGlobal优化针对普通聚合(例如 SUM 、 COUNT 、 MAX 、 MIN 和 AVG )有较好的效果,对于 DISTINCT 的聚合(如 COUNT DISTINCT 收效不明显,因为 COUNT DISTINCT 在 Local 聚合时,对于 DISTINCT KEY 的去重率不高,导致在 Global 节点仍然存在热点。


原理概述

之前,为了解决COUNT DISTINCT 的热点问题,通常需要手 动改写为两层聚合(增加按 Distinct Key取模的打散层)。


从 Flink1.9.0 版本开始,提供了 COUNT DISTINCT 自动打散功能, 通过HASH_CODE(distinct_key) % BUCKET_NUM 打散,不需要手动重写。Split Distinct 和LocalGlobal 的原理对比参见下图。

60a6bcefe26f4b118e50f46e4d0afd1d.png

Distinct举例

SELECT a,COUNT(DISTINCT b
FROM T
GROUP BY a

手动打散举例

SELECT a,SUM(
FROM (
  SELECT a,COUNT(DISTINCT b ) as cnt
  FROM T
  GROUP BY a,MOD(HASH_ b),
GROUP BY a

    默认不开启,使用参数显式开启

    table.optimizer.distinct agg.split.enabled: true 默认 false 。

    table.optimizer.distinct agg.split.bucket num: Split Distinct 优化在第一层聚合中,被打散的bucket 数目。默认 1024。

    // 初始化 table environment
    TableEnvironment tEnv = ...
    // 获取 tableEnv 的配置对象
    Configuration configuration = tEnv.getConfig().getConfiguration();
    // 设置参数:要结合 minibatch 一起 使用
    开启 Split Distinct
    configuration.setString(" table.optimizer.distinct agg.split.enabled ", "true");
    第一层 打 散 的 bucket 数目
    configuration.setString(" table.optimizer.distinct agg.split.bucket num ", "1024");

    注意事项:
    (1)目前不能在包含 UDAF 的 Flink SQL 中使用 Split Distinct 优化方法。
    (2)拆分出来的两个 GROUP 聚合还可参与 LocalGlobal 优化。
    (3)该功能在 F l ink1.9.0 版本 及以上版本才支持。

    提交案例:count ( 存在热点问题)

    60a6bcefe26f4b118e50f46e4d0afd1d.png

    提交案例:开启 split distinct

    60a6bcefe26f4b118e50f46e4d0afd1d.png

    从WebUI 可以看到有两次聚合,而且有 partialFinal 字样,第二次聚合时已经均匀 。

    多维DISTINCT 使用Filter


    在某些场景下,可能需要从不同维度来统计count distinct )的结果 (比如统计 uv 、app 端的 uv 、 web 端的 uv 可能会使用如下 CASE WHEN 语法 。

    SELECT
      a,
      COUNT(DISTINCT b ) AS total_ b,
      COUNT(DISTINCT CASE WHEN c IN (' A ', B ') THEN b ELSE NULL END) AS AB b,
      COUNT(DISTINCT CASE WHEN c IN (' C ', D ') THEN b ELSE NULL END) AS CD_b
    FROM T
    GROUP BY a
    

    在这种情况下,建议使用FILTER 语法 , 目前的 Flink SQL 优化器可以识别同一唯一键上的不同 FILTER 参数。如,在上面的示例中,三个 COUNT DISTINCT 都作用在 b 列上。此时,经过优化器识别后,Flink 可以只使用一个共享状态实例,而不是三个状态实例,可减少状态的大小和对状态的访问。

    将上边的CASE WHEN 替换成 FILTER 后 ,如下所示:

    SELECT
      a,
      COUNT(DISTINCT b ) AS b,
      COUNT(DISTINCT b ) FILT ER (WHERE c IN (' A ', B ')) AS AB_b,
      COUNT(DISTINCT b ) FILTER (WHERE c IN (' C ', D ')) AS CD b
    FROM T
    GROUP BY a
    

    提交案例 :多维 Distinct

    60a6bcefe26f4b118e50f46e4d0afd1d.png

    提交案例:使用 Filter

    60a6bcefe26f4b118e50f46e4d0afd1d.png

    通过WebUI 对比 前 1 0 次 Checkpoint 的大小,可以看到 状态 有所减小。

    设置参数总结

    批量输出的间隔时间
    configuration.setString("table.exec.mini batch.allow latency ", "5s");
    防止 OOM 设置每个批次最多缓存数据的条数 ,可以设为 2 万条
    configuration.setString("table.exec.mini batch.size ", "20000");
    // 开启 LocalGlobal
    config uration.setString("table.optimizer.agg phase strategy ", "TWO_PHASE");
    // 初始化 table environment
    TableEnvironment tEnv = ...
    // 获取 tableEnv 的配置对象
    Configuration configuration = tEnv.getConfig().getConfiguration();
    // 设置参数:
    开启 miniBatch
    configuration.setString(" table.exec.mini batch.enabled ", "true");
    批量输出的间隔时间
    configuration.setString(" table.exec.mini batch.allow latency ", "5s");
    防止 OOM 设置每个批次最多缓存数据的条数 ,可以设为 2 万条
    configuration.setString(" table.exec.mini batch.size ", 20000);
    // 开启 LocalGlobal
    configuration.setString(" table.optimizer.agg phase strategy ", "TWO_PHASE");
    开启 Split Distinct
    configuration.setString(" table.optimizer.distinct agg.split.enabled ", "true");
    第一层 打 散 的 bucket 数目
    configuration.setString(" table.optimizer.distinct agg.split.bucket num ", "1024");
    指定时区
    configuration.setString(" table.local time zone ", "Asia/Shang hai");
    


    相关实践学习
    基于Hologres轻松玩转一站式实时仓库
    本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
    Linux入门到精通
    本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
    相关文章
    |
    1月前
    |
    存储 关系型数据库 MySQL
    【性能优化】MySql查询性能优化必知必会
    【性能优化】MySql查询性能优化必知必会
    91 0
    【性能优化】MySql查询性能优化必知必会
    |
    9月前
    |
    SQL 存储 算法
    ClickHouse性能优化 3
    ClickHouse性能优化
    327 0
    |
    1月前
    |
    SQL 数据采集 监控
    14个Flink SQL性能优化实践分享
    本文档详细列举了Apache Flink SQL的性能调优策略。主要关注点包括:增加数据源读取并行度、优化状态管理(如使用RocksDB状态后端并设置清理策略)、调整窗口操作以减少延迟、避免类型转换和不合理的JOIN操作、使用广播JOIN、注意SQL查询复杂度、控制并发度和资源调度、自定义源码实现、执行计划分析、异常检测与恢复、监控报警、数据预处理与清洗、利用高级特性(如容器化部署和UDF)以及数据压缩与序列化。此外,文档还强调了任务并行化、网络传输优化、系统配置调优、数据倾斜处理和任务调度策略。通过这些方法,可以有效解决性能问题,提升Flink SQL的运行效率。
    |
    1月前
    |
    SQL 资源调度 监控
    Flink SQL性能优化实践
    Apache Flink流处理性能优化指南:探索数据源读取并行度、状态管理、窗口操作的优化策略,包括设置默认并行度、使用RocksDB状态后端、调整窗口大小。调优方法涉及数据源分区、JOIN条件优化、使用Broadcast JOIN。注意SQL复杂度、并发控制与资源调度,如启用动态资源分配。源码层面优化自定义Source和Sink,利用执行计划分析性能瓶颈。异常检测与恢复通过启用检查点,监控任务性能。预处理数据、使用DISTINCT去重,结合UDF提高效率。选择高效序列化框架和启用数据压缩,优化网络传输和系统配置。处理数据倾斜,均衡数据分布,动态调整资源和任务优先级,以提升整体性能。
    59 2
    |
    1月前
    |
    存储 缓存 监控
    Flink性能优化小结
    Flink性能优化小结
    |
    1月前
    |
    存储 数据采集 监控
    Flink中的性能优化有哪些方法?请举例说明。
    Flink中的性能优化有哪些方法?请举例说明。
    34 0
    |
    9月前
    |
    SQL 存储 分布式计算
    Hive性能优化之表设计优化2
    Hive性能优化之表设计优化2
    63 1
    |
    9月前
    |
    存储 SQL 固态存储
    ClickHouse性能优化 2
    ClickHouse性能优化
    359 0
    |
    9月前
    |
    存储 SQL HIVE
    ClickHouse性能优化 1
    ClickHouse性能优化
    174 0
    |
    存储 SQL JSON
    MySql查询性能优化必知必会
    作为一个写业务代码的 "JAVA CURD BOY" ,具备写出高效率SQL让应用高性能访问数据库的能力非常重要。获得这个能力的过程我收获了点知识和经验,今天在这里分享出来,希望大家多多交流指点。 本文内容主要包括以下几个方面:分析查询SQL,MySQL查询优化器、数据库存储结构、索引,索引维护,索引设计,SQL优化,表结构设计,分库分表,查询功能架构设计。
    492 0