阿里二面:Flink内存管理是如何实现的?

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 阿里二面:Flink内存管理是如何实现的?

一、内存模型

640.png

从大的方面来说,TaskManager进程的内存模型分为JVM本身所使用的内存和Flink使用的内存,Flink使用了堆上内存和堆外内存。

640.png

1.Flink使用的内存

1)JVM堆上内存


              a.  框架堆上内存Framework Heap Memory。Flink框架本身所使用的的内存,即TaskManager本身所占用的堆上内存,不计入Slot的资源


       配置参数:taskmanager.memory.framework.heap.size = 128MB,默认128MB。


               b. Task堆上内存Task Heap Memory。Task 执行用户代码时所使用的堆上内存。


       配置参数:taskmanager.memory.taks.heap.size。


       2)JVM堆外内存


               a. 框架堆外内存Framework Off-Heap Memory。Flink 框架本身所使用的的内存。即TaskManager本身所占用的堆外内存,不计入Slot资源。


       配置参数:taskmanager.memroy.framework.off-heap.size = 128MB,默认128MB。


               b. Task堆外内存Task Off-Heap Memory。Task执行用户代码时所使用的堆外内存。


       配置参数:taskmanager.memory.task.off-heap.size = 0,默认为0。


               c. 网络缓冲内存Network Memory。网络数据交换所使用的堆外加内存大小,如网络数据交换缓冲区(Network Buffer)。


       配置参数:taskmanager.memory.network.[64/1024/0.1](min/max/fraction),默认 min = 64MB,max = 1gb,fraction = 0.1。


               d. 堆外托管内存 Manged Memory。Flink管理的堆外内存。


       配置参数:taskmanager.memory.managed.[size][fraction],默认fraction = 0.4。

2. JVM本身使用的内存

JVM本身直接使用了操作系统的内存。


1. JVM元空间


JVM 元空间所使用的的内存。


配置参数:taskmanager.memory.jvm-metaspace = 96m, 默认96MB。


2. JVM执行开销


JVM 在执行时自身所需要的的内容,包括线程堆栈、IO、编译缓存等所使用的的内存。


配置参数:taskmanager.memory.jvm-overhead = [min/max/faction]。默认min=192MB,max=1GB,fraction = 0.1。

3. 总体内存

1.Flink使用内存


 综上而言,Flink使用的内存包括Flink使用的堆上、堆外内存。使用参数taskmanager.memory.flink.size进行控制。


2. 进程使用内存


 整个进程所使用的内存,包括Flink使用的内存和JVM使用的内存。使用参数taskmanager.memory.process.size进行控制。


JVM内存控制参数如下:


1)JVM堆上内存,使用-Xmx 和 -Xms 参数进行控制


2)JVM直接内存,使用参数-XX:MaxDirectMemorySize 进行控制。对于托管内存,使用Unsafe.allocateMemory()申请,不受改参数控制。


3)JVM Metaspace 使用-XX:MaxMetaspaceSize进行控制。


二、内存数据结构

2.1 内存段

内存段在Flink内部叫作MemorySegment,是Flink的内存抽象的最小分配单元。默认情况下,一个MemorySegment 对应一个32kb大小的内存块。这块内存既可以是堆上内存(Java的byte数组),也可以是堆外内存(基于Netty的DirecByteBuffer)。


       MemorySegment同时也提供了对二进制数据进行读取和写入的方法。对于Java基本数据类型,如short、int、long等,MemorySegment 内置了方法,可以直接返回或者写入数据,对于其他类型,读取二进制数组byte[]后进行反序列化,序列化为二进制数组byte[]后写入。


1.MemorySegment结构


1)BYTE_ARRAY_BASE_OFFSET:二进制字节数组的起始索引,相对于字节数组对象而言。


2)LITTLE_ENDIAN:判断是否为Little Endian模式的字节存储顺序,若不是,就是Big Endian模式。


3)HeapMemory:如果MemorySegment使用堆上内存,则表示一个堆上的字节数组(byte[]),如果MemorySegment使用对外内存,则为null。


4)address:字节数组对应的相对地址(若HeapMemory为null,即可能为对外内存的绝对地址)。


5)addressLimit:标识地址结束位置(address + size)。


6)size:内存段的字节数。


2.字节顺序Big Endian与Little Endian

字节顺序是指占内存多于一个字节类型的数据在内存中的存放顺序,不同的CPU架构体系使用不同的存储顺序。PowerPC系列采用Big Endian方式存储数据,低地址存放最高有效字节(MSB 高位编址),而x86 系列则采用Little Endian方式存储数据,低地址存放最低有效字节(LSB 低位编址)

640.png


3. MemorySegment实现

Flink的MemorySegment有堆上和堆外两种实现:


640.png


HeapMemorySegment 用来分配堆上内存,HybridMemorySegment用来分配堆外内存和堆上内存。实际上在2017年之后的Flink中,并没有使用HeapMemorySegment,而是使用 HybridMemorySegment这个类来同时实现堆上和堆外内存的分配。


2.2 内存页

内存页是MemorySegment之上的数据访问视图,数据读取抽象为DataInputView,数据写入抽象为DataOutputView。有了这一层,上层使用者无须关心MemorySegment的细节,该层会自动处理跨MemorySegment的读取和写入。

1.DataInputView


640.png


DataInputView 是从MemorySegment数据读取抽象视图,继承自java.io.DataInput.InputView中持有多个MemorySegment的引用(MemorySegment[]),这一组·MemorySegment被视为一个内存页(Page),可以顺序读取MemorySegment中的数据。


2.DataOutputView


640.png


DataOutputView 是从MemorySegment数据读取抽象视图,继承自java.io.DataOutput.OutputView中持有多个MemorySegment的引用(MemorySegment[]),这一组·MemorySegment被视为一个内存页(Page),可以顺序地向MemorySegment中写入数据。


2.3 Buffer

Task算子处理数据完毕,将结果交给下游的时候,使用的抽象或者说内存对象是Buffer。Buffer的接口是网络层面上传输数据和事件的统一抽象,其实现类是NetworkBuffer。Flink在各个TaskManger之间传递数据时,使用的是这一层的抽象。1个NetworkBuffer中包装了一个MemorySegment。


3d6854637a24c184d4aadc5ee841465e.png


Buffer 的底层是MemorySegment,Buffer申请和释放由Flink自行管理,Flink引入了引用数的概念。当有新的Buffer消费者时,引用数加1,当消费者消费完Buffer时,引用数减1,最终当引用数变为0时,就可以将Buffer释放重用了。


AbstractReferenceCountedByteBuf是Netty的抽象类,通过继承该类,Flink中的buffer 具备了引用计数的能力,并且实现了对MemorySegment的读写。


2.4 Buffer资源池

   Buffer资源池在Flink中叫作BufferPool。BufferPool用来管理Buffer,包含Buffer的申请、释放、销毁、可用Buffer的通知等,其实现类是LocalBufferPool,每个Task拥有自己的LocalBufferPool。


BufferPool的类体系如下:

e10686192df720b2b0c8b1f8c0117e29.png


为了方便对BufferPool的管理,Flink设计了BufferPoolFactory,提供了BufferPool的创建和销毁,其唯一的实现类是NetworkBufferPool。


每个TaskManager只有一个NetworkBufferPool,同一个TaskManager上的Task共享NetworkBufferPool,在TaskManager启动的时候,就会创建NetworkBufferpool,为其分配内存。


NetworkBufferPool持有该TaskManager在进行数据传递时所能够使用的所有内存,所以其除了作为BufferPool的工厂外,还作为Task所需内存段(MemorySegment)的提供者,每个Task的LocalBufferPool所需要的内存都是从NetworkBufferPool申请而来的。


三、内存管理器

3.1 内存申请

批处理计算任务中,MemoryManager负责为算子申请堆外内存。最终实际申请的是堆外的ByteBuffer。

#MemorySegmentFactory.java

64c3a1445cd7b39b4a115bf7c4156a9a.png

流计算任务中,MemoryManager更多的作用是管理,控制RocksDB的内存使用量,通过RocksDB的Block Cache 和WriteBufferManager参数来限制,参数的具体值从TaskManger的内存配置参数中计算来。RocksDb自己来负责运行过程中的内存申请和内存释放。


#MemoryManager.java


b08131f6a3016b208fd8070552bd673c.png


3.2 内存释放

Flink自行管理内存,也就意味着内存的申请和释放都由Flink来负责。触发Java堆外内存释放的行为一般有如下两种。


a.内存使用完毕


b.Task停止(正常或异常)执行。


在Flink中实现了一个JavaGcCleanerWrapper来进行堆外内存的释放,提供了两个JavaCleaner。


1)LegacyCleanerProvider


该CleanerProvider提供1.8及以下版本JDK的Flink管理的内存的垃圾回收,使用sun.misc.Cleaner来释放内存。


2)Java9CleanerProvider


该CleanerProvider提供1.9及以上版本JDK的Flink管理的内存的垃圾回收,使用java.lang.ref.Cleaner来释放内存。


JavaGcCleanerWrapper会为每一个Owner创建一个包含Cleaner的Runnable对象,在每个MemorySegment释放内存的时候,调用此Cleaner进行内存的释放。


四、网络缓冲器

网络缓冲器(NetworkBuffer)是网络交换数据的包装,其对应于MemorySegment内存段,当结果分区(ResultPartition)开始写出数据的时候,需要向LocalBufferPool申请Buffer资源,使用BufferBuilder将数据写入MemorySegment。当MemorySegment都分配完后,则会持续等待Buffer的释放。


8a3a7a4e776ba31c796b173bb797e552.png


BufferBuilder在上游Task中,用来向申请到的MemorySegment写入数据。与BufferBuilder相对的是BufferConsumer,BufferConsumer位于下游Task中,负责从MemorySegment中读取数据,1个BufferBuilder对应一个BufferConsumer。

4.1 内存申请

LocalBufferPool的大小是动态的,在最小内存段数量与最大内存段数量之间浮动。使用NetworkBufferPool创建LocalBufferPool时,如果该TaskManager的内存无法满足所有Task所需的最小MemorySegment的数据总和,则会发生错误。

1.buffer申请

结果分区(ResultPartition)申请Buffer进行数据写入。

6f9d3c522a6e603fd9b9f103f1d73385.png

LocalBufferPool 首先从自身持有的MemorySegment中分配可用的,如果没有可用的,则从TaskManager的NetworkBufferPool申请,如果没有,则阻塞等待可用的MemorySegment。

a25cbf65ca450d1d278605d0ed3f51f6.png

2.MemorySegment申请

申请Buffer本质上来说就是申请MemorySegment,如果在LocalBufferPool中,则申请新的堆外内存MemorySegment。

06746a242a89ef9cd55d4dab0708d28b.png

4.2 内存回收

Buffer使用了引用计数机制来判断什么时候可以释放Buffer到可用资源池。每创建一个BufferConsumer,就会对Buffer的引用计数+1,每个Buffer被消费完,就会对Buffer的引用计数-1,当Buffer引用计数为0的时候就可以回收了。

1.Buffer回收

80e25b43571d2205fb6c89da99a34d05.png

Buffer回收之后,并不会释放MemorySegment,此时MemorySegment仍然在LocalBufferPool的资源池中,除非TaskManager级别内存不足,才会释放回TaskManager持有的全局资源池。


释放MemorySegment的时候,同样要根据MemorySegment的类型来进行,并且要在不低于保留内存的情况下,将内存释放回内存段中 ,变为可用内存,后续申请MemorySegment的时候,可以重复利用该内存片段。

2.MemorySegment释放

当NetworkBufferPool关闭的时候进行内存的释放,交还给操作系统。


接下来Flink状态原理篇,如果对Flink感兴趣或者正在使用的小伙伴,可以加我入群一起探讨学习。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
8天前
|
Java 调度 流计算
有没有 大佬用 springboot 启动flink 处理 ,发现springboot 加 schedule调度处理会内存占满情况?
有没有 大佬用 springboot 启动flink 处理 ,发现springboot 加 schedule调度处理会内存占满情况?
38 6
|
5天前
|
存储 算法 关系型数据库
实时计算 Flink版产品使用合集之在Flink Stream API中,可以在任务启动时初始化一些静态的参数并将其存储在内存中吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
18 4
|
6天前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用合集之idea本地测试代码,要增大 Flink CDC 在本地 IDEA 测试环境中的内存大小如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
25 1
|
6天前
|
SQL 资源调度 关系型数据库
实时计算 Flink版产品使用合集之可以使用高并发大内存的方式读取存量数据吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
24 3
|
6天前
|
SQL Java 中间件
实时计算 Flink版产品使用合集之在进行全量拉取时,任务完成之后内存没有被完全释放如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
32 1
|
6天前
|
关系型数据库 MySQL Java
实时计算 Flink版产品使用合集之是否支持内存表的创建
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
19 3
|
8天前
|
SQL 存储 关系型数据库
Flink CDC产品常见问题之内存释放失败如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
8天前
|
SQL Java 关系型数据库
flink cdc 内存问题之不会回收如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
8天前
|
存储 SQL Java
阿里Flink云服务提供了CDC(Change Data Capture)功能
【2月更文挑战第10天】阿里Flink云服务提供了CDC(Change Data Capture)功能
42 1
|
8天前
|
SQL Java Apache
Flink内存问题之内存溢出如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。