Flink内存管理
Flink
内存组成整体结构图如下:
配置Flink进程的内存
FlinkJVM进程的进程总内存(TotalProcessMemory)包含了由Flink应用使用的内存(Flink总内存)以及由运行Flink的JVM使用的内存。Flink总内存(TotalFlinkMemory)包括JVM堆内存(HeapMemory)和堆外内存(Off-HeapMemory)。其中堆外内存包括直接内存(DirectMemory)和本地内存(NativeMemory)。
配置JobManager内存
JobManager是Flink集群的控制单元。它由三种不同的组件组成:ResourceManager、Dispatcher和每个正在运行作业的JobMaster。
配置TaskManager内存
Flink的TaskManager负责执行用户代码。根据实际需求为TaskManager配置内存将有助于减少Flink的资源占用,增强作业运行的稳定性。
如上图所示,下表中列出了FlinkTaskManager内存模型的所有组成部分,以及影响其大小的相关配置参数。
组成部分 | 配置参数 | 描述 |
FrameworkHeapMemory |
taskmanager.memory.framework.heap.size |
用于Flink框架的JVM堆内存(进阶配置)。 |
TaskHeapMemory |
taskmanager.memory.task.heap.size |
用于Flink应用的算子及用户代码的JVM堆内存。 |
Managedmemory |
|
由Flink管理的用于排序、哈希表、缓存中间结果及RocksDBStateBackend的本地内存。 |
FrameworkOff-heapMemory |
taskmanager.memory.framework.off-heap.size |
用于Flink框架的堆外内存(直接内存或本地内存)(进阶配置)。 |
TaskOff-heapMemory |
taskmanager.memory.task.off-heap.size |
用于Flink应用的算子及用户代码的堆外内存(直接内存或本地内存)。 |
NetworkMemory |
|
用于任务之间数据传输的直接内存(例如网络传输缓冲)。该内存部分为基于Flink总内存的受限的等比内存部分。 |
JVMMetaspace |
taskmanager.memory.jvm-metaspace.size |
FlinkJVM进程的Metaspace。 |
JVMOverhead |
|
框架内存
通常情况下,不建议对框架堆内存和框架堆外内存进行调整。除非你非常肯定Flink的内部数据结构及操作需要更多的内存。这可能与具体的部署环境及作业结构有关,例如非常高的并发度。此外,Flink的部分依赖(例如Hadoop)在某些特定的情况下也可能会需要更多的直接内存或本地内存。
内存计算
假设只配置TotalProcessMemory
=2gb
TotalProcessMemory
=TotalFlinkMemory
+JVMMetaspace
+JVMOverhead
JVMMetaspace
通过taskmanager.memory.jvm-metaspace.size
配置,默认96mb
JVMOverhead
计算:
配置参数 | 默认值 |
taskmanager.memory.jvm-overhead.min |
192mb |
taskmanager.memory.jvm-overhead.max |
1gb |
taskmanager.memory.jvm-overhead.fraction |
0.1 |
首先计算JVMOverhead
=TotalProcessMemory
*taskmanager.memory.jvm-overhead.fraction
=2gb*0.1=204.8mb
如果计算出来的JVMOverhead
<taskmanager.memory.jvm-overhead.min
则为taskmanager.memory.jvm-overhead.min
;
如果计算出来的JVMOverhead
>taskmanager.memory.jvm-overhead.max
则为taskmanager.memory.jvm-overhead.max
;
如果计算出来的taskmanager.memory.jvm-overhead.min
<=JVMOverhead
<=taskmanager.memory.jvm-overhead.max
则为JVMOverhead
.
因为192mb<204.8mb<1gb,所以 JVMOverhead
=204.8mb
TotalFlinkMemory
=TotalProcessMemory
-JVMMetaspace
-JVMOverhead
=2gb-96mb-204.8mb=1747.2mb
Managedmemory
=TotalFlinkMemory
*taskmanager.memory.managed.fraction
=1747.2mb*0.4=698.88mb
FrameworkOff-heapMemory
通过taskmanager.memory.framework.off-heap.size
配置,默认128mb
TaskOff-heapMemory
默认为0byte
NetworkMemory
计算方式同JVMoverhead
配置参数 | 默认值 |
taskmanager.memory.network.min |
64mb |
taskmanager.memory.network.max |
1gb |
taskmanager.memory.network.fraction |
0.1 |
TotalFlinkMemory
*taskmanager.memory.network.fraction
=1747.2mb*0.1=174.72mb
64mb<174.72mb<1gb,所以 NetworkMemory
=174.72mb
DirectMemory
=FrameworkOff-heapMemory
+TaskOff-heapMemory
+NetworkMemory
=128mb+0+174.72mb=302.72mb
**OffHeapMemory
**=Managedmemory
+DirectMemory
=698.88mb+302.72mb=1001.6mb
JVMHeap
=TotalFlinkMemory
-OffHeapMemory
=1747.2mb-1001.6mb=745.6mb 与web-ui显示一致
FrameworkHeapMemory
通过taskmanager.memory.framework.heap.size
配置, 默认128mb
TaskHeap
剩下的内存JVMHeap
-FrameworkHeapMemory
=617.6mb
Flink内存数据结构
Flink的内存管理和操作系统管理内存一样,将内存划分为内存段、内存页等结构。
3.1Flink内存段
内存段在Flink内部叫MemorySegment,是Flink中最小的内存分配单元,默认大小32KB。它既可以是堆上内存(Java的byte数组),也可以是堆外内存(基于Netty的DirectByteBuffer),同时提供了对二进制数据进行读取和写入的方法。
HeapMemorySegment:用来分配堆上内存;
HybridMemorySegment:用来分配堆外内存和堆上内存;2017年以后的版本实际上只使用了HybridMemorySegment。
通过一个案例介绍Flink在序列化和反序列化过程中如何使用MemorySegment:
如上图所示,当创建一个Tuple3对象时,包含三个层面,一是int类型,一是double类型,还有一个是Person。Person对象包含两个字段,一是int型的ID,另一个是String类型的name,
(1)在序列化操作时,会委托相应具体序列化的序列化器进行相应的序列化操作。从图中可以看到Tuple3会把int类型通过IntSerializer进行序列化操作,此时int只需要占用四个字节。
(2)Person类会被当成一个Pojo对象来进行处理,PojoSerializer序列化器会把一些属性信息使用一个字节存储起来。同样,其字段则采取相对应的序列化器进行相应序列化,在序列化完的结果中,可以看到所有的数据都是由MemorySegment去支持。
3.2Flink内存页
内存页是MemorySegment之上的数据访问视图,数据读取抽象为DataInputView,数据写入抽象为DataOutputView。使用时就无需关心MemorySegment的细节,该层会自动处理跨MemorySegment的读取和写入。
3.2.1DataInputView
DataInputView是从MemorySegment数据读取抽象视图,继承自java.io.DataInput。InputView中持有多个MemorySegment的引用(MemorySegment[]),这一组MemorySegment被视为一个内存页(Page),可以顺序读取MemorySegment中的数据。如下图为继承关系图:
3.2.2DataOutputView
DataOutputView是从MemorySegment数据读取抽象视图,继承自java.io.DataOutput。OutputView中持有多个MemorySegment的引用(MemorySegment[]),这一组MemorySegment被视为一个内存页(Page),可以顺序地向MemorySegment中写入数据。如下图为继承关系图:
3.2.3Buffer
Buffer是具有引用计数的MemorySegment实例的包装器。用来将上游Task算子处理完毕的结果交给下游时定义的一个抽象或者内存对象。
Buffer的接口是网络层面上传输数据和事件的统一抽象,其实现类是NetworkBuffer。Flink在各个TaskManger之间传递数据时,使用的是这一层的抽象。1个NetworkBuffer中包装了一个MemorySegment。Buffer接口类图如下:
Buffer的底层是MemorySegment,Buffer申请和释放由Flink自行管理,Flink引入了引用数的概念。当有新的Buffer消费者时,引用数加1,当消费者消费完Buffer时,引用数减1,最终当引用数变为0时,就可以将Buffer释放重用了。
3.2.4Buffer资源池
Buffer资源池在Flink中叫作BufferPool。BufferPool用来管理Buffer,包含Buffer的申请、释放、销毁、可用Buffer的通知等,其实现类是LocalBufferPool,每个Task拥有自己的LocalBufferPool。
BufferPool的类体系如下:
网络缓冲器(NetworkBuffer)
网络缓冲器(NetworkBuffer)是网络交换数据的包装,其对应于MemorySegment内存段。
NetworkBuffer,顾名思义,就是在网络传输中使用到的Buffer(实际非网络传输也会用到)。Flink经过网络传输的上下游Task的设计会比较类似生产者-消费者模型。
如果没有这个缓冲区,那么生产者或消费者会消耗大量时间在等待下游拿数据和上游发数据的环节上。加上这个缓冲区,生产者和消费者解耦开,任何一方短时间内的抖动理论上对另一方的数据处理都不会产生太大影响。如下图所示:
这是对于单进程内生产者-消费者模型的一个图示,事实上,如果两个Task在同一个TaskManager内,那么使用的就是上述模型,
对于不同TM内、或者需要跨网络传输的TM之间,利用生产者-消费者模型来进行数据传输的原理图如下:
可以看到,在NettyServer端,buffer只存在LocalBufferPool中,subpartition自己并没有缓存buffer或者独享一部分buffer,而在接收端,channel有自己独享的一部分buffer(ExclusiveBuffers),也有一部分共享的buffer(FloatingBuffers),所以,NetworkBuffer的使用同时存在于发送端和接收端。
由此可见,TaskManager内需要的buffers数量等于这个TaskManager内的所有Task中的发送端和接收端使用到的networkbuffer总和。明确了NetworkBuffer使用的位置,我们可以结合一些参数计算出作业实际所需的NetworkBuffer数量。
Flink内存调优
了解了FlinkJobManagerMemory和TaskManagerMemory的内存模型和数据结构之后,应该针对不同的部署情况,配置不同的内存,下面我们针对不同的部署方式介绍内存如何调优。
5.1为Standalone配置内存
建议为Standalone配置Flink总内存,设置JobManager和TaskManager的flink.size大小,声明为Flink本身提供了多少内存。配置方式如下:
参数配置: taskmanager.memory.flink.size: jobmanager.memory.flink.size:
5.2为Containers(容器)配置内存
建议为容器化部署(Kubernetes或Yarn)配置总进程内存,设置process.size大小,它声明了总共应该分配多少内存给FlinkJVM进程,并对应于请求容器的大小。配置方式如下:
参数配置: taskmanager.memory.process.size: jobmanager.memory.process.size:
注意:如果你配置了总Flink内存,Flink会隐式添加JVM内存组件来推导总进程内存,并请求一个具有该推导大小的内存的容器。
警告:如果Flink或用户代码分配超出容器大小的非托管堆外(本机)内存,作业可能会失败,因为部署环境可能会杀死有问题的容器。
5.3为statebackends(状态后端)配置内存
为statebackends(状态后端)配置内存时,这仅与TaskManager相关。
在部署Flink流应用程序时,所使用的状态后端类型将决定集群的最佳内存配置。
5.3.1HashMap状态后端
运行无状态作业或使用HashMapStateBackend时,将托管内存设置为零。这将确保为JVM上的用户代码分配最大数量的堆内存。配置如下:
配置参数:设置size:0 taskmanager.memory.managed.size:0
5.3.2RocksDB状态后端
该EmbeddedRocksDBStateBackend使用本机内存。默认情况下,RocksDB设置为将本机内存分配限制为托管内存的大小。因此,为你的状态保留足够的托管内存非常重要。如果禁用默认的RocksDB内存控制,RocksDB分配的内存超过请求的容器大小(总进程内存)的限制,则可以在容器化部署中终止TaskManager。
5.4为batchJob(批处理作业)配置内存
为batchJob(批处理作业)配置内存时,这仅与TaskManager相关。
Flink的批处理操作符利用托管内存来更高效地运行。这样做时,可以直接对原始数据执行某些操作,而无需反序列化为Java对象。这意味着托管内存配置对应用程序的性能有实际影响。Flink将尝试分配和使用为批处理作业配置的尽可能多的托管内存,但不会超出其限制。这可以防止OutOfMemoryError’s,因为Flink准确地知道它必须利用多少内存。如果托管内存不足,Flink会优雅地溢出到磁盘。
详见:(98条消息)Flink内存模型、网络缓冲器、内存调优、故障排除_一个写湿的程序猿的博客-CSDN博客_flinklocalbufferpool