1. slot、taskmanager、并行度
1)TaskManager/slots与cpu的关系
flink的每个TaskManager为集群提供slot。插槽的数量通常与每个TaskManager的可用CPU内核数成比例。一般情况下你的slot数是你每个TM的cpu的核数。 经验上讲Slot的数量与CPU-core的数量一致为好。但考虑到超线程,可以让slotNumber=2*cpuCore。
2) slot 与并行度
一般我们设置task的并行度不能超过slot的数量。 一个Task的并行度等于分配给它的Slot个数(前提槽资源充足)。
3)taskmanager、slot、并行度之间的关系?
在Yarn集群中Job分离模式下,Taskmanger的数量=ceil(slot数量/并行度)。slotNumber>=taskmanger*并行度
一些概念:
1.task:一个task可以看成是一条完整的算子连(source -> sink),在默认情况下,且线程够用的情况下,一个task运行在一个slot里面; 2.subtask:task的组成单位,一个subtask由一个线程来运行; 3.operator chain:subtask中的算子连成链; 4.形成subtask的规则: ①当数据发生重定向时例如KeyBy操作。 ②当程序算子并行度发生变化时。 ③通过算子手动切分Operator(disableOperatorChaining,startNewChain,disableChaining) 5.slot:隔绝内存,不隔绝cpu意味着一个slot可以有多个线程,所以,一个slot里面可以运行多个subtask,这是一种优化,相同资源组在同一个slot; 6.disableOperatorChaining:默认为开启,通过 env.disableOperatorChaining() 禁用; 7.startNewChain: 从当前算子之前开始划分一条新的Operator chain,在算子之后调用; 8.disableChaining: 针对某一个算子操作,断开算子前后的Operator chain,使算子单独作为一个subtask。 9. 共享资源槽【slotSharingGroup("name")】:相同资源组的subtask在一个slot执行,见图2,如果算子调用之后,从这个算子开始,接下来的算子在一个组内(slot内);
4):Task和Subtask
- 一个task就代表一个operator,同样的一个算子链也是一个task
- 当运行flink job时,一个task根据本身的并行度创建多个sub Task,即sub task的数量代表这个task的并行度。
- 每个subtask需要一个线程来运行。
简单地,计算一个job的并行度:所有Operator和它的并行度的乘积。
5):chain的含义与作用?
Flink会尽可能地将多个operator链接(chain)在一起形成一个task pipline。每个task pipline在一个线程中执行。
优点:它能减少线程之间的切换,减少消息的序列化/反序列化,减少数据在缓冲区的交换(即降低本地数据交换成本),减少了延迟的同时提高整体的吞吐量。提升了任务的执行效率。
形成算子链的条件?
同一个slot共享组;
并行度相同;
one to one操作(map、fliter、flatMap 等算子都是 one-to-one 的对应关系)。
上下游的并行度一致(槽一致) 该节点必须要有上游节点跟下游节点; 下游StreamNode的输入StreamEdge只能有一个) 上下游节点都在同一个 slot group 中(下面会解释 slot group) 下游节点的 chain 策略为 ALWAYS(可以与上下游链接,map、flatmap、filter等默认是ALWAYS) 上游节点的 chain 策略为 ALWAYS 或 HEAD(只能与下游链接,不能与上游链接,Source默认是HEAD) 上下游算子之间没有数据shuffle (数据分区方式是 forward) 用户没有禁用 chain
one-to-one :stream维护着分区以及元素的顺序(比如source和map之间)。这意味着map 算子的子任务看到的元素的个数以及顺序跟 source 算子的子任务生产的元素的个数、顺序相同。map、fliter、flatMap等算子都是one-to-one的对应关系。
Redistributing:stream的分区会发生改变。每一个算子的子任务依据所选择的transformation发送数据到不同的目标任务。例如,keyBy 基于 hashCode 重分区、而 broadcast 和 rebalance 会随机重新分区,这些算子都会引起redistribute过程,而 redistribute 过程就类似于 Spark 中的 shuffle 过程。
并行度不同的算子之前传递数据会进行重分区,Redistributing类型的算子也会进行重分区。
6)slot
slot 其实是一个线程,一个subtask使用一个slot的资源去运行,一个task由多个subtask组成; 因为有slot共享,flink可以将资源占用少和资源占用多的subtask放到一起,这样能充分利用资源; 有了slot共享,可以认为并行度最大(maxNum个subtask)的task所对应的并行度就是这个job所需的并行度。
TaskManager 是一个 JVM 进程,并会以独立的线程来执行一个task。为了控制一个 TaskManager 能接受多少个 task,Flink 提出了 Task Slot 的概念,
通过调整 task slot 的数量,用户可以定义task之间是如何相互隔离的。每个 TaskManager 有一个slot,也就意味着每个task运行在独立的 JVM 中。每个 TaskManager 有多个slot的话,也就是说多个task运行在同一个JVM中。而在同一个JVM进程中的task,可以共享TCP连接(基于多路复用)和心跳消息,可以减少数据的网络传输。也能共享一些数据结构,一定程度上减少了每个task的消耗。
subtask是调度的基本单元
TaskSlot是静态的概念,代表着Taskmanager具有的并发执行能力
parallelism是动态的概念,是指程序运行时实际使用的并发能力
Flink中slot是任务执行所申请资源的最小单元,同一个TaskManager上的所有slot都只是做了内存分离,没有做CPU隔离。
7)slot槽位共享
简单的说
为了高效使用资源,Flink默认允许一个job中不同task的subtask运行到一个slot中,就是Slot共享。
共享的意义
因为每个subtask消耗的资源不同,将资源占用少和资源占用多的subtask放到一起,而不是都高或者都低的subtask放到一起,这样能使得资源能得到充分利用。比如source和sink一般都是网络通讯的IO操作,相对于中间Operator,消耗资源并不大。
默认slot shareing是开启的,开启slot有两个好处:
计算一个job所需要的并行度:并行度最大的task所对应的并行度就是这个job所需的并行度。 更好的资源利用:将资源占用不多的source/map和资源占用较多的window算子放到一个slot,并且实现了占用资源较多的算子均匀分到了每个slot中,最后还有一个slot下跑了一整个pipeline。
同一个Job下的不同Task可一个放到同一个Slot中——处理槽共享分组;
8)slot槽位共享机制
Flink在调度任务分配Slot的时候遵循两个重要原则:
同一个Job中的同一分组中的不同Task可以共享同一个Slot; Flink是按照拓扑顺序依次从Source调度到sink。
在Flink中task需要按照一定规则共享Slot ,主要通过SlotSharingGroup和CoLocationGroup定义:
CoLocationGroup:强制将subTasksk放到同一个slot中,是一种硬约束:
保证把JobVertices的第n个运行实例和其他相同组内的JobVertices第n个实例运作在相同的slot中(所有的并行度相同的subTasks运行在同一个slot ); 主要用于迭代流(训练机器学习模型) ,用来保证迭代头与迭代尾的第i个subtask能被调度到同一个TaskManager上。
SlotSharingGroup
: 它是Flink中用来实现slot共享的类,尽可能的允许不同的JobVertices部署在相同的Slot中,但这是一种宽约束,只是尽量做到不能完全保证。
算子的默认group为default,所有任务可以共享同一个slot; 要想确定一个未做SlotSharingGroup设置的算子的group是什么,可以根据上游算子的 group 和自身是否设置 group共同确定(也就是说如果下游算子没有设置分组,它继承上游算子的分组); 为了防止不合理的共享,用户可以通过提供的API强制指定operator的共享组。因为不合理的共享槽资源(比如默认情况下所有任务共享所有的slot)会导致每个槽中运行的线程述增多,增加了机器负载。所以适当设置可以减少每个slot运行的线程数,从而整体上减少机器的负载。比如:someStream.filter(...).slotSharingGroup("group1")就强制指定了filter的slot共享组为group1。
任务链与处理槽共享组,前者是对执行效率的优化,后者是对内存资源的优化。
经验:调小tm的资源(cpu和memory),作业可以更好的分布。
9):flink中常见异常解决:
has no more allocated slots参数:
akka.ask.timeout = 600s
java.io.IOException: Too many open files
state.backend.rocksdb.files.open=-1;
泛型擦除错误:org.apache.flink.api.common.function.InvalidTypesException: The generic type parameters of '<class>' are missing
.returns(TypeInformation.of(new TypeHint<Tuple2<String, String>>() { })); .returns(TypeInformation.of(WordWithCount.class)); ...
akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://...]] after [10000 ms]
Akka超时导致,一般有两种原因: 一是集群负载比较大或者网络比较拥塞, 二是业务逻辑同步调用耗时的外部服务。 如果负载或网络问题无法彻底缓解,需考虑调大akka.ask.timeout参数的值(默认只有10秒);另外,调用外部服务时尽量异步操作(Async I/O)。
Checkpoint < cp_id> expired before completing
首先应检查CheckpointConfig.setCheckpointTimeout()方法设定的检查点超时,如果设的太短,适当改长一点。另外就是考虑发生了反压或数据倾斜,或者barrier对齐太慢。
2. Flink 的一些总结
1:状态后端的对比
2:反压监控参数
3:keyedState和OperatorState的区别
4:checkpoint文件说明
drwxr-xr-x 4 abc 74715970 128B Sep 23 03:19 job_127b2b84f80b368b8edfe02b2762d10d_op_KeyedProcessOperator_0d49016af99997646695a030f69aa7ee__1_1__uuid_65b50444-5857-4940-9f8c-77326cc79279/db drwxr-xr-x 4 abc 74715970 128B Sep 23 03:20 job_127b2b84f80b368b8edfe02b2762d10d_op_StreamFlatMap_11f49afc24b1cce91c7169b1e5140284__1_1__uuid_19b333d3-3278-4e51-93c8-ac6c3608507c/db
3. Flink state最佳实践
Operator state 使用建议
慎重使用长 list
由于 operator state 没有 key group 的概念,所以为了实现改并发恢复的功能,需要对 operator state 中的每一个序列化后的元素存储一个位置偏移 offset,也就是构成了上图红框中的 offset 数组。
那么如果你的 operator state 中的 list 长度达到一定规模时,这个 offset 数组就可能会有几十 MB 的规模,关键这个数组是会返回给 job master,当 operator 的并发数目很大时,很容易触发 job master 的内存超用问题。我们遇到过用户把 operator state 当做黑名单存储,结果这个黑名单规模很大,导致一旦开始执行 checkpoint,job master 就会因为收到 task 发来的“巨大”的 offset 数组,而内存不断增长直到超用无法正常响应。
正确使用 UnionListState
union list state 目前被广泛使用在 kafka connector 中,不过可能用户日常开发中较少遇到,他的语义是从检查点恢复之后每个并发 task 内拿到的是原先所有operator 上的 state,如下图所示:
Keyed state 使用建议
如何正确清空当前的 state
KeyedState使用建议
ValueState大状态内存优化
使用ValueState+HashMapStateBacked内存过大时,建议从以下分析:
ValueState容量缩减
ValueState数量减少
RawState使用建议:
RocksDB使用建议:
checkpoint使用建议