开发者学堂课程【《实时计算 Flink 版中级课程》:《如何使用实时计算对 Flink 任务进行调优》】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/806/detail/13884
《如何使用实时计算对 Flink 任务进行调优》
一、Flink 任务调优
>Metrics & Monitoring
>状态管理与调优
>检查点管理与调优
>Object reuse 与性能优化 .
>序列化器与性能优化
01. Metrics&Monitoring
内容简介:
1. Flink Metrics 系统概览
2. Metrics 的范围&变量
>System Scope
>Variables
3. Metrics 类型
>Counter
>Gauge
>Meter (rate)
>Histogram
4. VVP平台上 Metrics 链路
5.监控中的关键指标
>是否正在"运行”
Uptime
fullRestarts
>Checkpoint情况
●numberOfCompletedCheckpoints
●numberOfFailedCheckpoints
●lastCheckpointSize
6.重点指标
>Task & Operator级别的吞吐
>numRecords(In|Out)PerSecond
>numRecords(In|Out)
>Progress & Event-Time 延迟
>currentOutputWatermark
>Source connector 追数据情况
>(Kafka) records-lag-max
>(Kinesis) millisBehindLatest
7.JVM 相关内存指标
>Status.JVM.Memory.
>NonHeap.Committed
>Heap.Used
>Heap.Committed
>Direct.MemoryUsed
>Mapped.MemoryUsed
>Status.Flink.Memory .
>Managed.Used
>Managed.Total
8.JVM 相关 CPU 指标
>指标
>Status JVM.CPU.Load
>Status.JVM.CPU.Time
注意:对于 48 核机器上 1CPU 的 TM 来说0.021 = 100%负载
>Flink 统计的CPU指标仅供参考,无法准确统计非 java 的 CPU 使用情况,准确的CPU 使用可以参照调度系统(例如 YARN )的进程 /container CPU 使用情况
9.延迟问题排查
可以查看每个算子的 latency,通过配置 metrics.latency.interval 启用会引入一定的性能损失.
10.排查反压问题
11.定位反压问题
>上游的 outPoolUsage VS.下游的 inputFloatingBuffersUsage
>连续采样
>其他可能的手段>查看 source 的情况
>网络延迟
( credit based flow control )
>单 subtask 反压
>其他资源瓶颈
二.状态管理与调优
1.当你的作业规模扩大时
>可能更需要使用 RocksDB,需要启用增 checkpoint,必要时可以考虑 unaligned checkpoint
>清理 checkpoint 变得更重要
>checkpoint 可能更容易失败或者超时
>可能更容易碰到外部系统服务的限制
>需要更优雅地处理恢复
2.Task Manager 内存的分布与 state backend
>基于RocksDB
>基于Java Heap Objects
3.回顾基于 Heap 的 state backend
· state 以 java 对象的形式存储在堆上
·基本以 hash table 的格式存储
·每个 state 一个 hash table
·支持异步 checkpoint
·只有在 state snapshot 和 restore 时候才会进行序列化
·吞吐最高
4.state 性能提升相关的考虑
·选择拥有高效拷贝的 typeSerializer
·尽可能避免拷贝
·GC 相关的 tuning
·单台机上多个 task manager 的方式来进行扩并发
5.回顾基于 RocksDB 的 state backend
·state 以序列化字节的方式存储在堆外内存和磁盘上
·键值对数据库,以 LSM tree 的格式进行存储
·key 会序列化成 <keyGroup,key,namespace>
·LSM 天然支持多版本控制功能
·每次读写的时候都会进行序列化/反序列化
6.RocksDB 的 level 层级架构
·key 和 value 都是以序列化字节形式存储。
·MemTable∶新的 writer 会先写入到内存结构
·sst文件∶不可变的排序文件
7.谈 RocksDB 的 LSM
·设计出发点
>顺序磁盘访问比随机访问快很多
>append-only 的日志写模式可以利用这一点,但是不能提供有效的基于键的访问
>SST 文件
>每个文件均包含一部分有序的数据集合
>由于是不可变文件,当 record 更新或者删除时,重复的 record 总会创建出来。
>读取时,先从 memtable 开始,然后才会检查sst文件,以从新到日的顺序读取。
>sst 文件总是紧凑且有序的。
>可以使用 index 和 bloom filter 来优化 sst 文件中的键值访问
8.浅谈 RocksDB 的资源使用
>RocksDB 的实例是 operator 独享的
>一个state 对应一个column family
>SST 文件,MemTable, block cache 以及 compaction 线程
都是 per column family 的 。
>state.backend.rocksdb.block.cache-size
>默认的 block cache大小(8MB)
>state.backend.rocksdb.writebuffer.size
>默认的单个最大 MemTable 大小(64MB)
state.backend.rocksdb.writebuffer.count
>默认的最多 Memtable 数量(2)
9.浅谈 RocksDB 的资源使用
>RocksDB 的实例是 operator 独享的
>一个 state 对应一个 column family
>SST 文件,MemTable, block cache 以及 compaction 线程
都是 per column family 的
Indexes 和 Bloom filters
>可以通过 ConfigurableOptionsFactory 配置
>可以参考
PredefinedOptions#SPINNING_DISK_OPTIMIZED_HIGH_MEM
10.RocksDB 的性能调优
>写放大∶写入磁盘的数据量/写入 DB 的数据量
>读放大∶每次读取时需要读取磁盘的次数
>空间放大∶磁盘存储的数据量/ DB 的数据量
11.RocksDB 与 Timer
>默认情况下,timer 存储在 RocksDB 上,如果体量不大,可以考
虑放在 java 堆上
>如果想将 timer 存储在 Java 堆上,可以配置∶
state.backend.rocksdb.timer-service.factory: HEAP
>需要注意的是,如果使用 RocksDB 存储 state,但是将 timer 存储在堆上,这些timer 会在 snapshot 的同步阶段进行持久化。
12.增量 checkpointing
·目前只有 RocksDB state backend 支持增量 checkpoint
· Flink 会观察到上次成功创建/删除的 SST 文件,从而实现增量上传的目的。(默认情况下,SST 文件是 snappy 压缩的)
· 开启增量 checkpoint 后,进行恢复时,只需要简单地 re-open RocksDB 即可。
13.RocksDB 相关性能考虑
>尽量使用原生 state 类型
>可以开启 RocksDB native metrics 来帮助进行性能调优
>针对不同的硬件选择不同的 RocksDB 配置
>建议优先使用 SSD 磁盘,对性能有显著提升
14.State 过期清理
>不支持 EventTime 的 state 清理
>不支持 queryable state 的 清理
>不支持对已经存在的 state 添加或者移去 StateTtlConfig
三、检查点管理与调优
1.Snapshot,Checkpoint 和 Savepoint
·checkpoin 和 Savepoint 都属于snapshot 概念
·Checkpoint V.S Savepoint
2.Checkpoint 的相关配置
·增量还是全量 Checkpoint∶目前只有 RocksDB 支持增量,建议打开
·Exactly-once v.s at-least-once∶是否有 barrier 对齐,一般情况下,at-least-once能更快速完成 Checkpoint
·Checkpoint 间隔
·间隔越大,恢复时要追的数据越多
·对于 two-phase-commit 机制,间隔越大,事务提交地越慢
3.Checkpoint 的相关配置
>允许保存的 Checkpoint 数目∶默认1
>Checkpoint 超时时间∶默认10min
>Checkpoint 文件是否有压缩∶默认没有,RocksDB 天然支持压缩
>Checkpoint 遇到 error 时是否 fail 整个作业∶默认会 fail
>可以同时进行的 Checkpoint 数目∶默认1
>是否开启 unaligned checkpoint∶ 默认关闭
4.为什么 Checkpoint 会超时失败
·Timeout 对当前的 Checkpoint 来说太短了
-非常大的作业规模
-非常大的 state 或者没有开启增量 Checkpoint
-Checkpoint 的分布式文件存储系统网络访问慢
·反压导致 checkpoint barrier 无法及时传递到-反压的原因千千万
·用户方法内存在同步的对外 IO 操作,外部系统异常时导致 IO 操作慢
·数据量增大,作业响应慢
·如果不想因为反压导致 checkpoint 超时,可以考虑 unaligned checkpoint
5.Unaligned checkpoint
·barrier 可以"超前"被 operator 消费
·checkpoint 触发时机与作业反压与否解耦,不再受限制于当前 channel 处理速度
·增大 了 IO 数据量,增大了对磁盘的数据压力
6.checkpoint 相关 UI 监控
7.如何调查 checkpoint 超时原因
·观察 E2E duration,sync duration,async duration
·E2E duration>>sync duration+async duration
,说明大部分时间耗在了barrier 传递到相关节点上,推测作业存在反压,通过相关UI工具去验证是否反压。
·async duration 耗时长,写分布式存储性能有问题
·对齐的 buffered 数据量太大
·作业存在反压或者部分 task 问 题,这个数值越大越有问题
·热点问题
·是否有部分 task 的 state 数据量特别大,或者存在明显的反压情况。
8.如何降低 Checkpoint 超时的可能性
·不要实现存在阻塞操作的相关算子
·根据实际情况合理配置超时时间
·使用 unaligned Checkpoint
·选择升级 Flink 的新特性
-FLIP-27 和 FLINK-10886 将支持 source 端的 event time 对齐
四、Object reuse 与性能优化
1.回顾 operator chaining
>Chaining
>发送方 out.collect()直接接入接收方
>数据是通过保护性拷贝进行传输(不通过 Flink 的网络层序列化传输)
>只有在 forward 数据传输时才能生效
2. Object Reuse
·默认是关闭的
·当启用时
-流计算∶在 operator chain 中传输的是相同的对象
-批计算∶可以有更复杂的 re-use 模式
3.流计算中 Object Reuse 的特点
·Operator 间的数据传输不再是保护性拷贝
·第二个 operator 可以修改一个 operator 的数值
-第一个 operator 内的 filed 可以修改
-基于 JVM Heap 的 state backend
-在 out.collect()之后,第一个 operator 再 访问数据
·存储的 value 可能是不明确的
· window state
· contextkey
4.新计算中 Object Reuse 的 限制
·在方法调用过程中记住输入对象是不安全的
·不要修改输入对象
·可能会修改了输出对象并且再次输出
· 在使用 Object reuse 时,确保使用不可变类型
五.序列化器与性能优化
1.Flink 的序列化系统
·数据导入时的序列化
—在向外部系统读写数据时的序列化(例如 Kafka)
·数据传输的序列化
—在 Flink task 间交互数据时的序列化
·状态的序列化
—checkpoint 写时候的序列化,以及 RocksDB 的读写数据
2.Flink 的序列化系统
·原生支持的类型 TypeExtractor#privateGetForClass
·基本类型(Primitive type)
· Tuples, Scala Case Classes
·Avro 类型
·POJO 类型
·不原生支持的类型都会转为 kryo 进行序列化
3.Flink 的序列化系统
3. 自定义序列化器以提升性能
·通过 ExecutionConfig 来注册 kryo
· registerKryoType(Class<?>)
· 针对相应类提供 kryo 的序列化器(需要继承自:
com.esotericsoftware.kryo.Serializer )
4.自定义序列化器以提升性能
>利用 @TypeInfo 创建用户定义的 TypeInformation
5.自定义序列化器以提升性能
·创建 POJO 类借用 pojo serializer
· 类必须是 public 且不含静态内部类(no non-static inner class)
·类必须有一个无参构造方法 (no-argument constructor )
·所有非静态属性必须是(non- final)public 或者有 public 的 getter 和 setter
六、Object reuse
1.Object reuse 作业练习题
·源码下载
>将摄氏度转换为华氏温度
>利用指数移动平均求平均值
进入指标页面,观察每秒发送的 record 数量,从而获知作业 TPS
2. Object reuse 作业练习题
启动的 class 入口
-并发度设置为 2
·问题1∶如果开启 object reuse
(传入参数--objectReuse true),比较性能区别
·问题2∶注意观察为了保证正确性,MovingAverageSensors 和ConvertToLocalTemperature 类做了哪些努力
>启动的 class 入口
com.ververica.flinktraining.solutions.troubleshoot.0bjectReuseJobSolution1
>并发度设置为 2
问题 3∶
MovingAverageSensors 和 ConvertToLocalTemperature 的传输的对象是否可以用更高效的 serializer 来优化性能 (不用 kryo)?观察 metrics 中的每秒输出 record 数进行调优。
-Hint∶
com.ververica.flinktraining.solutions.troubleshoot.immutable下有提供相关不可变类以及自定义的 serializer
如何使用 AutoPilot 对作业自动调优
目录
· AutoPilot 整体架构
· AutoPilot 自动调优策略
· AutoPilot 应用实操
· AutoPilot 使用注意事项
·智能诊断功能(VVP 2.4.0)
一、AutoPilot 整体架构
1.Agent基本工作流程
二、AutoPilot 自动调优策略
1.基于 Cpu 利用率的调优策略
·CPU 利用率高于指定阈值时,进行 scale up
·CPU 利用率长时间低于指定阈值时,进行 s cale down
·IO Bound 类型的作业需要把 CPU 的阈值调小
·优点∶简单高效, 没有额外依赖
·缺点∶算法精度低,收敛速度慢
2.基于内存使用的调优策略
·内存的调优策略主要是根据作业内存实际利用率,以及 GC 的 metric 来动态调整tm 的内存
·整体内存利用率高于指定阈值时,增大 TM 的内存
·整体内存利用率长时间低于指定阈值时,减少 TM 的内存
·TM GC 时 间、GC 频率超过阈值时,增大 TM 的内存
3.基于 Delay 的调优策略
·当 Source 出现延迟时,此时作业处于反压的状态吞吐达到最大
·根据 Delay 的变化率,以及反压状态下的吞吐,可以快速的预估出作业实际需要处理的 TPS 和并发度
·优点∶算法精度高,可以实现作业快速回复健康状态
·缺点∶依赖 delay metric,支持大部分 SQL 作业中用到的 connector
已支持社区 FLIP-33 标准化的 Connector Metrics,覆盖所有实现这一标准 metric体系的 connector
4.基于 Slot 利用率的调优策略
· Flink-1.11 开始完成作业数据处理模型的升级,Mailbox 模式变成默认处理模式,每个任务处理模型简化,变成单线程,更加高效,为了衡量一个 mail 是否延迟,我们引入 IdleTime 的 metric∶用来统计一个 task 没有处理数据的时间
·综合所有并发的 IdeTime,我们可以快速衡量一个 vertex 是否空闲,能不能进讲行并发度的回收
· 优点∶算法精度高,可以准确的衡量每 个 vertex 的空闲程度
能根据slot利用率来进行 scale up/scale down 作业的并发度
5.基于 JobException 的调优策略
· 作业在启动或运行过程中,因为内存不足或者其它原因会造成作业持续 failover
· Autopilot 会对 job Exception 进行根因诊断,通过调整作业资源来保持作业的稳定
· 优点∶
自动恢复因为内存配置不足的作业
有效解决 JM OOM 的场景
6.策略冲突怎么办?
AutoPilot 会智能判断选择!
7.AutoPilot 三种工作形态
·Active
· Monitor(默认开启)
·Disabled (非常不建议)
8.AutoPilot 高级配置
· 限制调整资源的上下限
· 调整各个 trigger 指标的阈值
· 调整 scale up/down 动作的 ratio
· 限制调整并发的上下限
四、AutoPilot 应用实操
1. 开启 AutoPilot
2.查看作业的当前状态
3.查看历史操作记录
4.未开启/开启 AutoPilot 效果
五、 AutoPilot 使用注意事项
1.AutoPilot 使用注意事项
· AutoPilot 修改并发度是通过默认的并发度来实现的,因此作业每个节点都不能设置并发度,否则就无法实现动态调节
· AutoPilot 触发后需要重启作业,会导致作业短暂停止处理数据
· AutoPilot 策略对作业的处理模式假设∶
1.流量平滑变化、不能有数据倾斜、每个算子的吞吐能力能够随并发度线性拓展
2.当作业 pattern 严重偏离这几个假设时,可能会存在作业异常,没有触发 scale,或者算法无法收敛,作业持续重启等异常场景,此时需要关闭 AutoPilot 手动调优
· AutoPilot 目前无法识别外部系统的问题∶当外部系统故障,访问变慢时,会导致作业 scale up,导致输出压力变大,
会加重外部系统的压力,导致外部系统雪崩
· Autopilot 目前不支持 session cluster 部署的作业
· Deployment 后需要手动重启作业
>作业状态
> Failover 汇总
>Check Poin t 状态
>数据倾斜
>数据过滤
>K8S 资源
> Slot 利用率
>SQL聚合优化
>TM/JM cpu/mem 状态
3.下图左侧就是作业的信息