《如何使用实时计算对 Flink 任务进行调优》|学习笔记

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 快速学习 《如何使用实时计算对 Flink 任务进行调优》

开发者学堂课程【《实时计算 Flink 版中级课程》:《如何使用实时计算对 Flink 任务进行调优》】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/806/detail/13884


《如何使用实时计算对 Flink 任务进行调优》

一、Flink 任务调优

>Metrics & Monitoring

>状态管理与调优

>检查点管理与调优

>Object reuse 与性能优化 .

>序列化器与性能优化

01. Metrics&Monitoring

内容简介:

1. Flink Metrics 系统概览

2. Metrics 的范围&变量

>System Scope

>Variables

3. Metrics 类型

>Counter

>Gauge

>Meter (rate)

>Histogram

4. VVP平台上 Metrics 链路

5.监控中的关键指标

>是否正在"运行”

Uptime

fullRestarts

>Checkpoint情况

numberOfCompletedCheckpoints

numberOfFailedCheckpoints

lastCheckpointSize

6.重点指标

>Task & Operator级别的吞吐

>numRecords(In|Out)PerSecond

>numRecords(In|Out)

>Progress & Event-Time 延迟 

>currentOutputWatermark

>Source connector 追数据情况

>(Kafka) records-lag-max

>(Kinesis) millisBehindLatest

7.JVM 相关内存指标

>Status.JVM.Memory.

>NonHeap.Committed

>Heap.Used

>Heap.Committed

>Direct.MemoryUsed

>Mapped.MemoryUsed

>Status.Flink.Memory .

>Managed.Used

>Managed.Total

8.JVM 相关 CPU 指标

>指标

>Status JVM.CPU.Load

>Status.JVM.CPU.Time

注意:对于 48 核机器上 1CPU 的 TM 来说0.021 = 100%负载

>Flink 统计的CPU指标仅供参考,无法准确统计非 java 的 CPU 使用情况,准确的CPU 使用可以参照调度系统(例如 YARN )的进程 /container CPU 使用情况

9.延迟问题排查

可以查看每个算子的 latency,通过配置 metrics.latency.interval 启用会引入一定的性能损失.

10.排查反压问题

11.定位反压问题

>上游的 outPoolUsage VS.下游的 inputFloatingBuffersUsage

>连续采样

>其他可能的手段>查看 source 的情况

>网络延迟

( credit based flow control )

>单 subtask 反压

>其他资源瓶颈

 

二.状态管理与调优

1.当你的作业规模扩大时

>可能更需要使用 RocksDB,需要启用增 checkpoint,必要时可以考虑 unaligned checkpoint

>清理 checkpoint 变得更重要

>checkpoint 能更容易失败或者超时

>可能更容易碰到外部系统服务的限制

>需要更优雅地处理恢复

2.Task Manager 内存的分布与 state backend

>基于RocksDB

>基于Java Heap Objects

3.回顾基于 Heap 的 state backend

· state 以 java 对象的形式存储在堆上

·基本以 hash table 的格式存储

·每个 state 一个 hash table

·支持异步 checkpoint

·只有在 state snapshot 和 restore 时候才会进行序列化

·吞吐最高

4.state 性能提升相关的考虑

·选择拥有高效拷贝的 typeSerializer

·尽可能避免拷贝

·GC 相关的 tuning

·单台机上多个 task manager 的方式来进行扩并发

5.回顾基于 RocksDB 的 state backend

·state 以序列化字节的方式存储在堆外内存和磁盘上

·键值对数据库,以 LSM tree 的格式进行存储

·key 会序列化成 <keyGroup,key,namespace>

·LSM 天然支持多版本控制功能

·每次读写的时候都会进行序列化/反序列化

6.RocksDB 的 level 层级架构

·key 和 value 都是以序列化字节形式存储。

·MemTable∶新的 writer 会先写入到内存结构

·sst文件∶不可变的排序文件

7.谈 RocksDB 的 LSM

·设计出发点

>顺序磁盘访问比随机访问快很多

>append-only 的日志写模式可以利用这一点,但是不能提供有效的基于键的访问

>SST 文件

>每个文件均包含一部分有序的数据集合

>由于是不可变文件,当 record 更新或者删除时,重复的 record 总会创建出来。

>读取时,先从 memtable 开始,然后才会检查sst文件,以从新到日的顺序读取。

>sst 文件总是紧凑且有序的。

>可以使用 index 和 bloom filter 来优化 sst 文件中的键值访问

8.浅谈 RocksDB 的资源使用

>RocksDB 的实例是 operator 独享的

>一个state 对应一个column family

>SST 文件,MemTable, block cache 以及 compaction 线程

都是 per column family 的 。

>state.backend.rocksdb.block.cache-size

>默认的 block cache大小(8MB)

>state.backend.rocksdb.writebuffer.size

>默认的单个最大 MemTable 大小(64MB)

state.backend.rocksdb.writebuffer.count

>默认的最多 Memtable 数量(2)

9.浅谈 RocksDB 的资源使用

>RocksDB 的实例是 operator 独享的

>一个 state 对应一个 column family

>SST 文件,MemTable, block cache 以及 compaction 线程

都是 per column family 的

Indexes 和 Bloom filters

>可以通过 ConfigurableOptionsFactory 配置

>可以参考

PredefinedOptions#SPINNING_DISK_OPTIMIZED_HIGH_MEM

10.RocksDB 的性能调优

>写放大∶写入磁盘的数据量/写入 DB 的数据量

>读放大∶每次读取时需要读取磁盘的次数

>空间放大∶磁盘存储的数据量/  DB 的数据量

11.RocksDB 与 Timer

>默认情况下,timer 存储在 RocksDB 上,如果体量不大,可以考

虑放在 java 堆上

>如果想将 timer 存储在 Java  堆上,可以配置∶

state.backend.rocksdb.timer-service.factory: HEAP

>需要注意的是,如果使用 RocksDB 存储 state,但是将 timer 存储在堆上,这些timer 会在 snapshot 的同步阶段进行持久化。

12.增量 checkpointing

·目前只有 RocksDB state backend 支持增量 checkpoint

· Flink 会观察到上次成功创建/删除的 SST 文件,从而实现增量上传的目的。(默认情况下,SST 文件是 snappy 压缩的)

· 开启增量 checkpoint 后,进行恢复时,只需要简单地 re-open RocksDB 即可。

13.RocksDB 相关性能考虑

>尽量使用原生 state 类型

>可以开启 RocksDB native metrics 来帮助进行性能调优

>针对不同的硬件选择不同的 RocksDB 配置

>建议优先使用 SSD 磁盘,对性能有显著提升

14.State 过期清理

>不支持 EventTime 的 state 清理

>不支持 queryable state 的 清理

>不支持对已经存在的 state 添加或者移去 StateTtlConfig

 

三、检查点管理与调优

1.Snapshot,Checkpoint 和 Savepoint

·checkpoin 和 Savepoint 都属于snapshot 概念

·Checkpoint V.S Savepoint

2.Checkpoint 的相关配置

·增量还是全量 Checkpoint∶目前只有 RocksDB 支持增量,建议打开

·Exactly-once v.s at-least-once∶是否有 barrier 对齐,一般情况下,at-least-once能更快速完成 Checkpoint

·Checkpoint 间隔

·间隔越大,恢复时要追的数据越多

·对于 two-phase-commit 机制,间隔越大,事务提交地越慢

3.Checkpoint 的相关配置

>允许保存的 Checkpoint 数目∶默认1

>Checkpoint 超时时间∶默认10min

>Checkpoint 文件是否有压缩∶默认没有,RocksDB 天然支持压缩

>Checkpoint 遇到 error 时是否 fail 整个作业∶默认会 fail

>可以同时进行的 Checkpoint 数目∶默认1

>是否开启 unaligned checkpoint∶ 默认关闭

4.为什么 Checkpoint 会超时失败

·Timeout 对当前的 Checkpoint 来说太短了

-非常大的作业规模

-非常大的 state 或者没有开启增量 Checkpoint

-Checkpoint 的分布式文件存储系统网络访问慢

·反压导致 checkpoint barrier 无法及时传递到-反压的原因千千万

·用户方法内存在同步的对外 IO 操作,外部系统异常时导致 IO 操作慢

·数据量增大,作业响应慢

·如果不想因为反压导致 checkpoint 超时,可以考虑 unaligned checkpoint

5.Unaligned checkpoint

·barrier 可以"超前"被 operator 消费

·checkpoint 触发时机与作业反压与否解耦,不再受限制于当前 channel 处理速度

·增大 了 IO 数据量,增大了对磁盘的数据压力

6.checkpoint 相关 UI 监控

7.如何调查 checkpoint 超时原因

·观察 E2E duration,sync duration,async duration

·E2E duration>>sync duration+async duration,说明大部分时间耗在了barrier 传递到相关节点上,推测作业存在反压,通过相关UI工具去验证是否反压。

·async duration 耗时长,写分布式存储性能有问题

·对齐的 buffered 数据量太大

·作业存在反压或者部分 task 问 题,这个数值越大越有问题

·热点问题

·是否有部分 task 的 state 数据量特别大,或者存在明显的反压情况。

8.如何降低 Checkpoint 超时的可能性

·不要实现存在阻塞操作的相关算子

·根据实际情况合理配置超时时间

·使用 unaligned Checkpoint

·选择升级 Flink 的新特性

-FLIP-27 和 FLINK-10886 将支持 source 端的 event time 对齐

 

四、Object reuse 与性能优化

1.回顾 operator chaining

>Chaining

>发送方 out.collect()直接接入接收

>数据是通过保护性拷贝进行传输(不通过 Flink 的网络层序列化传输)

>只有在 forward 数据传输时才能生效

2. Object Reuse

·默认是关闭的

·当启用时

-流计算∶在 operator chain 中传输的是相同的对象

-批计算∶可以有更复杂的 re-use 模式

3.流计算中 Object Reuse 的特点

·Operator 间的数据传输不再是保护性拷贝

·第二个 operator 可以修改一个 operator 的数值

-第一个 operator 内的 filed 可以修改

-基于 JVM Heap 的 state backend

-在 out.collect()之后,第一个 operator 再 访问数据

·存储的 value 可能是不明确的

· window state

· contextkey

4.新计算中 Object Reuse 的 限制

·在方法调用过程中记住输入对象是不安全的

·不要修改输入对象

·可能会修改了输出对象并且再次输出

· 在使用 Object reuse 时,确保使用不可变类型

 

五.序列化器与性能优化

1.Flink 的序列化系统

·数据导入时的序列化

—在向外部系统读写数据时的序列化(例如 Kafka)

·数据传输的序列化

—在 Flink task 间交互数据时的序列化

·状态的序列化

—checkpoint 写时候的序列化,以及 RocksDB 的读写数据

2.Flink 的序列化系统

·原生支持的类型 TypeExtractor#privateGetForClass

·基本类型(Primitive type)

· Tuples, Scala Case Classes

·Avro 类型

·POJO 类型

·不原生支持的类型都会转为 kryo 进行序列化

3.Flink 的序列化系统

3. 自定义序列化器以提升性能

·通过 ExecutionConfig 来注册 kryo

· registerKryoType(Class<?>)

· 针对相应类提供 kryo 的序列化器(需要继承自:

com.esotericsoftware.kryo.Serializer )

4.自定义序列化器以提升性能

>利用 @TypeInfo 创建用户定义的  TypeInformation

5.自定义序列化器以提升性能

·创建 POJO 类借用 pojo serializer

· 类必须是 public 且不含静态内部类(no non-static inner class)

·类必须有一个无参构造方法 (no-argument constructor )

·所有非静态属性必须是(non- final)public 或者有 public 的 getter 和 setter


六、Object reuse

1.Object reuse 作业练习题

·源码下载

>将摄氏度转换为华氏温度

>利用指数移动平均求平均值

进入指标页面,观察每秒发送的 record 数量,从而获知作业 TPS

2. Object reuse 作业练习题

启动的 class 入口

-并发度设置为 2

·问题1∶如果开启 object reuse

(传入参数--objectReuse true),比较性能区别

·问题2∶注意观察为了保证正确性,MovingAverageSensors 和ConvertToLocalTemperature 类做了哪些努力

>启动的 class 入口

com.ververica.flinktraining.solutions.troubleshoot.0bjectReuseJobSolution1

>并发度设置为 2

问题 3∶

MovingAverageSensors 和 ConvertToLocalTemperature 的传输的对象是否可以用更高效的 serializer 来优化性能 (不用 kryo)?观察 metrics 中的每秒输出 record 数进行调优。

-Hint∶

com.ververica.flinktraining.solutions.troubleshoot.immutable下有提供相关不可变类以及自定义的 serializer


如何使用 AutoPilot 对作业自动调优

目录

· AutoPilot 整体架构

· AutoPilot 自动调优策略

· AutoPilot 应用实操

· AutoPilot 使用注意事项

·智能诊断功能(VVP 2.4.0)

一、AutoPilot 整体架构

1.Agent基本工作流程

图片1.png

二、AutoPilot 自动调优策略

1.基于 Cpu 利用率的调优策略

·CPU 利用率高于指定阈值时,进行 scale up

·CPU 利用率长时间低于指定阈值时,进行 s cale down

·IO Bound 类型的作业需要把 CPU 的阈值调小

·优点∶简单高效, 没有额外依赖

·缺点∶算法精度低,收敛速度慢

2.基于内存使用的调优策略

·内存的调优策略主要是根据作业内存实际利用率,以及 GC 的 metric 来动态调整tm 的内存

·整体内存利用率高于指定阈值时,增大 TM 的内存

·整体内存利用率长时间低于指定阈值时,减少 TM 的内存

·TM GC 时 间、GC 频率超过阈值时,增大 TM 的内存

3.基于 Delay 的调优策略

·当 Source 出现延迟时,此时作业处于反压的状态吞吐达到最大

·根据 Delay 的变化率,以及反压状态下的吞吐,可以快速的预估出作业实际需要处理的 TPS 和并发度

·优点∶算法精度高,可以实现作业快速回复健康状态

·缺点∶依赖 delay metric,支持大部分 SQL 作业中用到的 connector

已支持社区 FLIP-33 标准化的 Connector Metrics,覆盖所有实现这一标准 metric体系的 connector

4.基于 Slot 利用率的调优策略

· Flink-1.11 开始完成作业数据处理模型的升级,Mailbox 模式变成默认处理模式,每个任务处理模型简化变成单线程更加高效,为了衡量一个 mail 是否延迟,我们引入 IdleTime 的 metric∶用来统计一个 task 没有处理数据的时间

·综合所有并发的 IdeTime,我们可以快速衡量一个 vertex 是否空闲,能不能进讲行并发度的回收

· 优点∶算法精度高,可以准确的衡量每 个 vertex 的空闲程度

能根据slot利用率来进行 scale up/scale down 作业的并发度

5.基于 JobException 的调优策略

· 作业在启动或运行过程中,因为内存不足或者其它原因会造成作业持续 failover

· Autopilot 会对 job Exception 进行根因诊断,通过调整作业资源来保持作业的稳定

· 优点∶

自动恢复因为内存配置不足的作业

有效解决 JM OOM 的场景

6.策略冲突怎么办?

AutoPilot 会智能判断选择!

7.AutoPilot 三种工作形态

·Active

· Monitor(默认开启)

·Disabled (非常不建议)

8.AutoPilot 高级配置

· 限制调整资源的上下限

· 调整各个 trigger 指标的阈值

· 调整 scale up/down 动作的 ratio

· 限制调整并发的上下限

四、AutoPilot 应用实操

1. 开启 AutoPilot

2.查看作业的当前状态

3.查看历史操作记录

4.未开启/开启 AutoPilot 效果

五、 AutoPilot  使用注意事项

1.AutoPilot 使用注意事项

· AutoPilot 修改并发度是通过默认的并发度来实现的,因此作业每个节点都不能设置并发度,否则就无法实现动态调节

· AutoPilot 触发后需要重启作业,会导致作业短暂停止处理数据

· AutoPilot 策略对作业的处理模式假设∶

1.流量平滑变化、不能有数据倾斜、每个算子的吞吐能力能够随并发度线性拓展

2.当作业 pattern 严重偏离这几个假设时,可能会存在作业异常,没有触发 scale,或者算法无法收敛,作业持续重启等异常场景,此时需要关闭 AutoPilot  手动调优

· AutoPilot 目前无法识别外部系统的问题∶当外部系统故障,访问变慢时,会导致作业 scale up,导致输出压力变大,

会加重外部系统的压力,导致外部系统雪崩

· Autopilot 目前不支持 session cluster 部署的作业

· Deployment 后需要手动重启作业

>作业状态

> Failover 汇总

>Check Poin t 状态

>数据倾斜

>数据过滤

>K8S 资源

> Slot 利用率

>SQL聚合优化

>TM/JM cpu/mem 状态

3.下图左侧就是作业的信息

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
9天前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
2月前
|
SQL 消息中间件 Kafka
实时计算 Flink版产品使用问题之如何在EMR-Flink的Flink SOL中针对source表单独设置并行度
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
10天前
|
存储 运维 监控
阿里云实时计算Flink版的评测
阿里云实时计算Flink版的评测
42 15
|
9天前
|
运维 分布式计算 监控
评测报告:阿里云实时计算Flink版
本评测主要针对阿里云实时计算Flink版在用户行为分析中的应用。作为一名数据分析师,我利用该服务处理了大量日志数据,包括用户点击流和登录行为。Flink的强大实时处理能力让我能够迅速洞察用户行为变化,及时调整营销策略。此外,其卓越的性能和稳定性显著降低了运维负担,提升了项目效率。产品文档详尽且易于理解,但建议增加故障排查示例。
|
9天前
|
机器学习/深度学习 运维 监控
阿里云实时计算Flink版体验评测
阿里云实时计算Flink版提供了完善的产品内引导和丰富文档,使初学者也能快速上手。产品界面引导清晰,内置模板简化了流处理任务。官方文档全面,涵盖配置、开发、调优等内容。此外,该产品在数据开发和运维方面表现优秀,支持灵活的作业开发和自动化运维。未来可增强复杂事件处理、实时可视化展示及机器学习支持,进一步提升用户体验。作为阿里云大数据体系的一部分,它能与DataWorks、MaxCompute等产品无缝联动,构建完整的实时数据处理平台。
|
2月前
|
消息中间件 监控 Kafka
联通实时计算平台问题之Flink状态后端数据量较大时,问题排查要如何进行
联通实时计算平台问题之Flink状态后端数据量较大时,问题排查要如何进行
|
2月前
|
监控 Cloud Native 流计算
实时计算 Flink版产品使用问题之如何查看和管理任务
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
资源调度 Java Scala
实时计算 Flink版产品使用问题之如何实现ZooKeeper抖动导致任务失败时,能从最近的检查点重新启动任务
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
Kubernetes Java 数据库连接
实时计算 Flink版产品使用问题之部署到 Kubernetes 集群时,任务过一会儿自动被取消,该如何排查
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
资源调度 安全 数据处理
实时计算 Flink版产品使用问题之提交任务时如何设置TaskManager的数量
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
下一篇
无影云桌面