flink的常见知识点总结(一)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云原生内存数据库 Tair,内存型 2GB
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
简介: flink的常见知识点总结(一)

1. slot、taskmanager、并行度

1)TaskManager/slots与cpu的关系

flink的每个TaskManager为集群提供slot。插槽的数量通常与每个TaskManager的可用CPU内核数成比例。一般情况下你的slot数是你每个TM的cpu的核数。
经验上讲Slot的数量与CPU-core的数量一致为好。但考虑到超线程,可以让slotNumber=2*cpuCore。


2) slot 与并行度

一般我们设置task的并行度不能超过slot的数量。
一个Task的并行度等于分配给它的Slot个数(前提槽资源充足)。


3)taskmanager、slot、并行度之间的关系?

在Yarn集群中Job分离模式下,Taskmanger的数量=ceil(slot数量/并行度)。slotNumber>=taskmanger*并行度


一些概念:

1.task:一个task可以看成是一条完整的算子连(source -> sink),在默认情况下,且线程够用的情况下,一个task运行在一个slot里面;
2.subtask:task的组成单位,一个subtask由一个线程来运行;
3.operator chain:subtask中的算子连成链;
4.形成subtask的规则:
①当数据发生重定向时例如KeyBy操作。
②当程序算子并行度发生变化时。
③通过算子手动切分Operator(disableOperatorChaining,startNewChain,disableChaining)
5.slot:隔绝内存,不隔绝cpu意味着一个slot可以有多个线程,所以,一个slot里面可以运行多个subtask,这是一种优化,相同资源组在同一个slot;
6.disableOperatorChaining:默认为开启,通过 env.disableOperatorChaining() 禁用;
7.startNewChain: 从当前算子之前开始划分一条新的Operator chain,在算子之后调用;
8.disableChaining: 针对某一个算子操作,断开算子前后的Operator chain,使算子单独作为一个subtask。
9. 共享资源槽【slotSharingGroup("name")】:相同资源组的subtask在一个slot执行,见图2,如果算子调用之后,从这个算子开始,接下来的算子在一个组内(slot内);

4):Task和Subtask

  • 一个task就代表一个operator,同样的一个算子链也是一个task
  • 当运行flink job时,一个task根据本身的并行度创建多个sub Task,即sub task的数量代表这个task的并行度。
  • 每个subtask需要一个线程来运行。

简单地,计算一个job的并行度:所有Operator和它的并行度的乘积。


5):chain的含义与作用?

Flink会尽可能地将多个operator链接(chain)在一起形成一个task pipline。每个task pipline在一个线程中执行。

优点:它能减少线程之间的切换,减少消息的序列化/反序列化,减少数据在缓冲区的交换(即降低本地数据交换成本),减少了延迟的同时提高整体的吞吐量。提升了任务的执行效率。

形成算子链的条件?

同一个slot共享组;

并行度相同;

one to one操作(map、fliter、flatMap 等算子都是 one-to-one 的对应关系)。

上下游的并行度一致(槽一致)
该节点必须要有上游节点跟下游节点;
下游StreamNode的输入StreamEdge只能有一个) 
上下游节点都在同一个 slot group 中(下面会解释 slot group) 
下游节点的 chain 策略为 ALWAYS(可以与上下游链接,map、flatmap、filter等默认是ALWAYS) 
上游节点的 chain 策略为 ALWAYS 或 HEAD(只能与下游链接,不能与上游链接,Source默认是HEAD) 
上下游算子之间没有数据shuffle (数据分区方式是 forward) 
用户没有禁用 chain

one-to-one :stream维护着分区以及元素的顺序(比如source和map之间)。这意味着map 算子的子任务看到的元素的个数以及顺序跟 source 算子的子任务生产的元素的个数、顺序相同。map、fliter、flatMap等算子都是one-to-one的对应关系。

Redistributing:stream的分区会发生改变。每一个算子的子任务依据所选择的transformation发送数据到不同的目标任务。例如,keyBy 基于 hashCode 重分区、而 broadcast 和 rebalance 会随机重新分区,这些算子都会引起redistribute过程,而 redistribute 过程就类似于 Spark 中的 shuffle 过程。

并行度不同的算子之前传递数据会进行重分区,Redistributing类型的算子也会进行重分区。

6)slot

slot 其实是一个线程,一个subtask使用一个slot的资源去运行,一个task由多个subtask组成;
因为有slot共享,flink可以将资源占用少和资源占用多的subtask放到一起,这样能充分利用资源;
有了slot共享,可以认为并行度最大(maxNum个subtask)的task所对应的并行度就是这个job所需的并行度。


TaskManager 是一个 JVM 进程,并会以独立的线程来执行一个task。为了控制一个 TaskManager 能接受多少个 task,Flink 提出了 Task Slot 的概念,

通过调整 task slot 的数量,用户可以定义task之间是如何相互隔离的。每个 TaskManager 有一个slot,也就意味着每个task运行在独立的 JVM 中。每个 TaskManager 有多个slot的话,也就是说多个task运行在同一个JVM中。而在同一个JVM进程中的task,可以共享TCP连接(基于多路复用)和心跳消息,可以减少数据的网络传输。也能共享一些数据结构,一定程度上减少了每个task的消耗。

subtask是调度的基本单元

TaskSlot是静态的概念,代表着Taskmanager具有的并发执行能力

parallelism是动态的概念,是指程序运行时实际使用的并发能力

Flink中slot是任务执行所申请资源的最小单元,同一个TaskManager上的所有slot都只是做了内存分离,没有做CPU隔离


7)slot槽位共享

简单的说

为了高效使用资源,Flink默认允许一个job中不同task的subtask运行到一个slot中,就是Slot共享。


共享的意义

因为每个subtask消耗的资源不同,将资源占用少和资源占用多的subtask放到一起,而不是都高或者都低的subtask放到一起,这样能使得资源能得到充分利用。比如source和sink一般都是网络通讯的IO操作,相对于中间Operator,消耗资源并不大。


默认slot shareing是开启的,开启slot有两个好处:

计算一个job所需要的并行度:并行度最大的task所对应的并行度就是这个job所需的并行度。
更好的资源利用:将资源占用不多的source/map和资源占用较多的window算子放到一个slot,并且实现了占用资源较多的算子均匀分到了每个slot中,最后还有一个slot下跑了一整个pipeline。

同一个Job下的不同Task可一个放到同一个Slot中——处理槽共享分组;

8)slot槽位共享机制

Flink在调度任务分配Slot的时候遵循两个重要原则:

同一个Job中的同一分组中的不同Task可以共享同一个Slot;
Flink是按照拓扑顺序依次从Source调度到sink。


在Flink中task需要按照一定规则共享Slot ,主要通过SlotSharingGroup和CoLocationGroup定义:

CoLocationGroup:强制将subTasksk放到同一个slot中,是一种硬约束:

保证把JobVertices的第n个运行实例和其他相同组内的JobVertices第n个实例运作在相同的slot中(所有的并行度相同的subTasks运行在同一个slot );
主要用于迭代流(训练机器学习模型) ,用来保证迭代头与迭代尾的第i个subtask能被调度到同一个TaskManager上。

SlotSharingGroup: 它是Flink中用来实现slot共享的类,尽可能的允许不同的JobVertices部署在相同的Slot中,但这是一种宽约束,只是尽量做到不能完全保证。

算子的默认group为default,所有任务可以共享同一个slot; 
要想确定一个未做SlotSharingGroup设置的算子的group是什么,可以根据上游算子的 group 和自身是否设置 group共同确定(也就是说如果下游算子没有设置分组,它继承上游算子的分组); 
为了防止不合理的共享,用户可以通过提供的API强制指定operator的共享组。因为不合理的共享槽资源(比如默认情况下所有任务共享所有的slot)会导致每个槽中运行的线程述增多,增加了机器负载。所以适当设置可以减少每个slot运行的线程数,从而整体上减少机器的负载。比如:someStream.filter(...).slotSharingGroup("group1")就强制指定了filter的slot共享组为group1。

任务链与处理槽共享组,前者是执行效率的优化,后者是对内存资源的优化

经验:调小tm的资源(cpu和memory),作业可以更好的分布


9):flink中常见异常解决:

has no more allocated slots参数:

akka.ask.timeout = 600s

java.io.IOException: Too many open files

state.backend.rocksdb.files.open=-1;

泛型擦除错误:org.apache.flink.api.common.function.InvalidTypesException: The generic type parameters of '<class>' are missing

.returns(TypeInformation.of(new TypeHint<Tuple2<String, String>>() { }));
.returns(TypeInformation.of(WordWithCount.class));
...

akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://...]] after [10000 ms]

Akka超时导致,一般有两种原因:
一是集群负载比较大或者网络比较拥塞,
二是业务逻辑同步调用耗时的外部服务。
如果负载或网络问题无法彻底缓解,需考虑调大akka.ask.timeout参数的值(默认只有10秒);另外,调用外部服务时尽量异步操作(Async I/O)。

Checkpoint < cp_id> expired before completing

首先应检查CheckpointConfig.setCheckpointTimeout()方法设定的检查点超时,如果设的太短,适当改长一点。另外就是考虑发生了反压或数据倾斜,或者barrier对齐太慢。


2. Flink 的一些总结

1:状态后端的对比

2:反压监控参数

3:keyedState和OperatorState的区别

4:checkpoint文件说明

drwxr-xr-x   4 abc  74715970   128B Sep 23 03:19 job_127b2b84f80b368b8edfe02b2762d10d_op_KeyedProcessOperator_0d49016af99997646695a030f69aa7ee__1_1__uuid_65b50444-5857-4940-9f8c-77326cc79279/db
drwxr-xr-x   4 abc  74715970   128B Sep 23 03:20 job_127b2b84f80b368b8edfe02b2762d10d_op_StreamFlatMap_11f49afc24b1cce91c7169b1e5140284__1_1__uuid_19b333d3-3278-4e51-93c8-ac6c3608507c/db

3. Flink state最佳实践

Operator state 使用建议
慎重使用长 list


由于 operator state 没有 key group 的概念,所以为了实现改并发恢复的功能,需要对 operator state 中的每一个序列化后的元素存储一个位置偏移 offset,也就是构成了上图红框中的 offset 数组。


那么如果你的 operator state 中的 list 长度达到一定规模时,这个 offset 数组就可能会有几十 MB 的规模,关键这个数组是会返回给 job master,当 operator 的并发数目很大时,很容易触发 job master 的内存超用问题。我们遇到过用户把 operator state 当做黑名单存储,结果这个黑名单规模很大,导致一旦开始执行 checkpoint,job master 就会因为收到 task 发来的“巨大”的 offset 数组,而内存不断增长直到超用无法正常响应。


正确使用 UnionListState

union list state 目前被广泛使用在 kafka connector 中,不过可能用户日常开发中较少遇到,他的语义是从检查点恢复之后每个并发 task 内拿到的是原先所有operator 上的 state,如下图所示:

Keyed state 使用建议

如何正确清空当前的 state

KeyedState使用建议



ValueState大状态内存优化
使用ValueState+HashMapStateBacked内存过大时,建议从以下分析:

ValueState容量缩减
ValueState数量减少


RawState使用建议:

RocksDB使用建议:

checkpoint使用建议

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2月前
|
机器学习/深度学习 分布式计算 BI
Flink实时流处理框架原理与应用:面试经验与必备知识点解析
【4月更文挑战第9天】本文详尽探讨了Flink实时流处理框架的原理,包括运行时架构、数据流模型、状态管理和容错机制、资源调度与优化以及与外部系统的集成。此外,还介绍了Flink在实时数据管道、分析、数仓与BI、机器学习等领域的应用实践。同时,文章提供了面试经验与常见问题解析,如Flink与其他系统的对比、实际项目挑战及解决方案,并展望了Flink的未来发展趋势。附带Java DataStream API代码样例,为学习和面试准备提供了实用素材。
313 0
|
28天前
|
消息中间件 Java 关系型数据库
实时计算 Flink版操作报错合集之从 PostgreSQL 读取数据并写入 Kafka 时,遇到 "initial slot snapshot too large" 的错误,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
923 0
|
28天前
|
存储 SQL 关系型数据库
实时计算 Flink版操作报错合集之按时间恢复时,报错:在尝试读取binlog时发现所需的binlog位置不再可用,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
718 0
|
28天前
|
消息中间件 资源调度 Java
实时计算 Flink版操作报错合集之遇到了缺少包的错误,已经添加了相应的 jar 包,仍然出现同样的报错,该怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
664 2
|
28天前
|
SQL JSON 数据库
实时计算 Flink版操作报错合集之写入Hudi时,遇到从 COW(Copy-On-Write)表类型转换为 MOR(Merge-On-Read)表类型时报字段错误,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
28天前
|
监控 Oracle 关系型数据库
实时计算 Flink版操作报错合集之在配置连接时,添加了scan.startup.mode参数后,出现报错。是什么导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
816 0
|
28天前
|
消息中间件 Oracle 关系型数据库
实时计算 Flink版操作报错合集之连接RabbitMQ时遇到Could not find any factory for identifier 'rabbitmq' that implements 'org.apache.flink.table.factories.DynamicTableFactory'错误,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
401 0
|
28天前
|
SQL 关系型数据库 MySQL
实时计算 Flink版操作报错合集之CDC任务在异常后整个record sent从0初始化开始,是什么导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
518 0
|
28天前
|
Java 关系型数据库 流计算
实时计算 Flink版操作报错合集之配置cats进行从MySQL到StarRocks的数据同步任务时遇到报错,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
308 0
|
28天前
|
关系型数据库 数据库 流计算
实时计算 Flink版操作报错合集之在使用Flink CDC TiDB Connector时,无法获取到事件,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
380 0