05 Flink TCP-based 反压机制(before V1.5)
示例:WindowWordCount
大体的逻辑就是从 Socket 里去接收数据,每 5s 去进行一次 WordCount,将这个代码提交后就进入到了编译阶段。
编译阶段:生成 JobGraph
这时候还没有向集群去提交任务,在 Client 端会将 StreamGraph 生成 JobGraph,JobGraph 就是做为向集群提交的最基本的单元。在生成 JobGrap 的时候会做一些优化,将一些没有 Shuffle 机制的节点进行合并。有了 JobGraph 后就会向集群进行提交,进入运行阶段。
运行阶段:调度 ExecutionGraph
obGraph 提交到集群后会生成 ExecutionGraph ,这时候就已经具备基本的执行任务的雏形了,把每个任务拆解成了不同的 SubTask,上图 ExecutionGraph 中的 Intermediate Result Partition 就是用于发送数据的模块,最终会将 ExecutionGraph 交给 JobManager 的调度器,将整个 ExecutionGraph 调度起来。然后我们概念化这样一张物理执行图,可以看到每个 Task 在接收数据时都会通过这样一个 InputGate 可以认为是负责接收数据的,再往前有这样一个 ResultPartition 负责发送数据,在 ResultPartition 又会去做分区跟下游的 Task 保持一致,就形成了 ResultSubPartition 和 InputChannel 的对应关系。这就是从逻辑层上来看的网络传输的通道,基于这么一个概念我们可以将反压的问题进行拆解。
问题拆解:反压传播两个阶段
反压的传播实际上是分为两个阶段的,对应着上面的执行图,我们一共涉及 3 个 TaskManager,在每个 TaskManager 里面都有相应的 Task 在执行,还有负责接收数据的 InputGate,发送数据的 ResultPartition,这就是一个最基本的数据传输的通道。在这时候假设最下游的 Task (Sink)出现了问题,处理速度降了下来这时候是如何将这个压力反向传播回去呢?这时候就分为两种情况:
- 跨 TaskManager ,反压如何从 InputGate 传播到 ResultPartition
- TaskManager 内,反压如何从 ResultPartition 传播到 InputGate
5.1 跨 TaskManager 数据传输)
前面提到,发送数据需要 ResultPartition,在每个 ResultPartition 里面会有分区 ResultSubPartition,中间还会有一些关于内存管理的 Buffer。
对于一个 TaskManager 来说会有一个统一的 Network BufferPool 被所有的 Task 共享,在初始化时会从 Off-heap Memory 中申请内存,申请到内存的后续内存管理就是同步 Network BufferPool 来进行的,不需要依赖 JVM GC 的机制去释放。有了 Network BufferPool 之后可以为每一个 ResultSubPartition 创建 Local BufferPool 。
如上图左边的 TaskManager 的 Record Writer 写了 <1,2> 这个两个数据进来,因为 ResultSubPartition 初始化的时候为空,没有 Buffer 用来接收,就会向 Local BufferPool 申请内存,这时 Local BufferPool 也没有足够的内存于是将请求转到 Network BufferPool,最终将申请到的 Buffer 按原链路返还给 ResultSubPartition,<1,2> 这个两个数据就可以被写入了。之后会将 ResultSubPartition 的 Buffer 拷贝到 Netty 的 Buffer 当中最终拷贝到 Socket 的 Buffer 将消息发送出去。然后接收端按照类似的机制去处理将消息消费掉。
接下来我们来模拟上下游处理速度不匹配的场景,发送端的速率为 2,接收端的速率为 1,看一下反压的过程是怎样的。
5.2 跨 TaskManager 反压过程
因为速度不匹配就会导致一段时间后 InputChannel 的 Buffer 被用尽,于是他会向 Local BufferPool 申请新的 Buffer ,这时候可以看到 Local BufferPool 中的一个 Buffer 就会被标记为 Used。
发送端还在持续以不匹配的速度发送数据,然后就会导致 InputChannel 向 Local BufferPool 申请 Buffer 的时候发现没有可用的 Buffer 了,这时候就只能向 Network BufferPool 去申请,当然每个 Local BufferPool 都有最大的可用的 Buffer,防止一个 Local BufferPool 把 Network BufferPool 耗尽。这时候看到 Network BufferPool 还是有可用的 Buffer 可以向其申请。
一段时间后,发现 Network BufferPool 没有可用的 Buffer,或是 Local BufferPool 的最大可用 Buffer 到了上限无法向 Network BufferPool 申请,没有办法去读取新的数据,这时 Netty AutoRead 就会被禁掉,Netty 就不会从 Socket 的 Buffer 中读取数据了。
显然,再过不久 Socket 的 Buffer 也被用尽,这时就会将 Window = 0 发送给发送端(前文提到的 TCP 滑动窗口的机制)。这时发送端的 Socket 就会停止发送。
很快发送端的 Socket 的 Buffer 也被用尽,Netty 检测到 Socket 无法写了之后就会停止向 Socket 写数据。
Netty 停止写了之后,所有的数据就会阻塞在 Netty 的 Buffer 当中了,但是 Netty 的 Buffer 是无界的,可以通过 Netty 的水位机制中的 high watermark 控制他的上界。当超过了 high watermark,Netty 就会将其 channel 置为不可写,ResultSubPartition 在写之前都会检测 Netty 是否可写,发现不可写就会停止向 Netty 写数据。
这时候所有的压力都来到了 ResultSubPartition,和接收端一样他会不断的向 Local BufferPool 和 Network BufferPool 申请内存。
Local BufferPool 和 Network BufferPool 都用尽后整个 Operator 就会停止写数据,达到跨 TaskManager 的反压。
5.3 TaskManager 内反压过程
了解了跨 TaskManager 反压过程后再来看 TaskManager 内反压过程就更好理解了,下游的 TaskManager 反压导致本 TaskManager 的 ResultSubPartition 无法继续写入数据,于是 Record Writer 的写也被阻塞住了,因为 Operator 需要有输入才能有计算后的输出,输入跟输出都是在同一线程执行, Record Writer 阻塞了,Record Reader 也停止从 InputChannel 读数据,这时上游的 TaskManager 还在不断地发送数据,最终将这个 TaskManager 的 Buffer 耗尽。具体流程可以参考下图,这就是 TaskManager 内的反压过程。
06 Flink Credit-based 反压机制(since V1.5)
6.1 TCP-based 反压的弊端
在介绍 Credit-based 反压机制之前,先分析下 TCP 反压有哪些弊端。
在一个 TaskManager 中可能要执行多个 Task,如果多个 Task 的数据最终都要传输到下游的同一个 TaskManager 就会复用同一个 Socket 进行传输,这个时候如果单个 Task 产生反压,就会导致复用的 Socket 阻塞,其余的 Task 也无法使用传输,checkpoint barrier 也无法发出导致下游执行 checkpoint 的延迟增大。
依赖最底层的 TCP 去做流控,会导致反压传播路径太长,导致生效的延迟比较大。
6.2 引入 Credit-based 反压
这个机制简单的理解起来就是在 Flink 层面实现类似 TCP 流控的反压机制来解决上述的弊端,Credit 可以类比为 TCP 的 Window 机制。
Credit-based 反压过程
如图所示在 Flink 层面实现反压机制,就是每一次 ResultSubPartition 向 InputChannel 发送消息的时候都会发送一个 backlog size 告诉下游准备发送多少消息,下游就会去计算有多少的 Buffer 去接收消息,算完之后如果有充足的 Buffer 就会返还给上游一个 Credit 告知他可以发送消息(图上两个 ResultSubPartition 和 InputChannel 之间是虚线是因为最终还是要通过 Netty 和 Socket 去通信),下面我们看一个具体示例。
假设我们上下游的速度不匹配,上游发送速率为 2,下游接收速率为 1,可以看到图上在 ResultSubPartition 中累积了两条消息,10 和 11, backlog 就为 2,这时就会将发送的数据 <8,9> 和 backlog = 2 一同发送给下游。下游收到了之后就会去计算是否有 2 个 Buffer 去接收,可以看到 InputChannel 中已经不足了这时就会从 Local BufferPool 和 Network BufferPool 申请,好在这个时候 Buffer 还是可以申请到的。
过了一段时间后由于上游的发送速率要大于下游的接受速率,下游的 TaskManager 的 Buffer 已经到达了申请上限,这时候下游就会向上游返回 Credit = 0,ResultSubPartition 接收到之后就不会向 Netty 去传输数据,上游 TaskManager 的 Buffer 也很快耗尽,达到反压的效果,这样在 ResultSubPartition 层就能感知到反压,不用通过 Socket 和 Netty 一层层地向上反馈,降低了反压生效的延迟。同时也不会将 Socket 去阻塞,解决了由于一个 Task 反压导致 TaskManager 和 TaskManager 之间的 Socket 阻塞的问题。
如果您看到任务的状态为“ 正常 ”,则表明没有背压。高,另一方面意味着任务是背压力。
08 背压指标含义
为了判断是否进行反压,jobmanager会每50ms触发100次stack traces。 Web界面中显示阻塞在内部方法调用的stacktraces占所有的百分比。 例如,0.01,代表着100次中有一次阻塞在内部调用。 OK: 0 <= Ratio <= 0.10 LOW: 0.10 < Ratio <= 0.5 HIGH: 0.5 < Ratio <= 1
可以使用以下配置为作业管理器配置样本数:
- web.backpressure.refresh-interval:不推荐使用的统计信息将在此之前过期,需要刷新(默认值:60000ms)。
- web.backpressure.num-samples:用来确定背压的样本数量(默认值:100)。
- web.backpressure.delay-between-samples:样品之间的延迟以确定背压(默认值:50 ms)
07 总结
网络流控是为了在上下游速度不匹配的情况下,防止下游出现过载
网络流控有静态限速和动态反压两种手段:
- Flink 1.5 之前是基于 TCP 流控 + bounded buffer 实现反压
- Flink 1.5 之后实现了自己托管的 credit – based 流控机制,在应用层模拟 TCP 的流控机制
就是每一次 ResultSubPartition 向 InputChannel 发送消息的时候都会发送一个 backlog size 告诉下游准备发送多少消息,下游就会去计算有多少的 Buffer 去接收消息,算完之后如果有充足的 Buffer 就会返还给上游一个 Credit 告知他可以发送消息
有了动态反压,静态限速是不是完全没有作用了?
实际上动态反压不是万能的,我们流计算的结果最终是要输出到一个外部的存储(Storage),外部数据存储到 Sink 端的反压是不一定会触发的,这要取决于外部存储的实现,像 Kafka 这样是实现了限流限速的消息中间件可以通过协议将反压反馈给 Sink 端,但是像 ES 无法将反压进行传播反馈给 Sink 端,这种情况下为了防止外部存储在大的数据量下被打爆,我们就可以通过静态限速的方式在 Source 端去做限流。所以说动态反压并不能完全替代静态限速的,需要根据合适的场景去选择处理方案