阿里二面:Flink内存管理是如何实现的?

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 阿里二面:Flink内存管理是如何实现的?

一、内存模型

640.png

从大的方面来说,TaskManager进程的内存模型分为JVM本身所使用的内存和Flink使用的内存,Flink使用了堆上内存和堆外内存。

640.png

1.Flink使用的内存

1)JVM堆上内存


              a.  框架堆上内存Framework Heap Memory。Flink框架本身所使用的的内存,即TaskManager本身所占用的堆上内存,不计入Slot的资源


       配置参数:taskmanager.memory.framework.heap.size = 128MB,默认128MB。


               b. Task堆上内存Task Heap Memory。Task 执行用户代码时所使用的堆上内存。


       配置参数:taskmanager.memory.taks.heap.size。


       2)JVM堆外内存


               a. 框架堆外内存Framework Off-Heap Memory。Flink 框架本身所使用的的内存。即TaskManager本身所占用的堆外内存,不计入Slot资源。


       配置参数:taskmanager.memroy.framework.off-heap.size = 128MB,默认128MB。


               b. Task堆外内存Task Off-Heap Memory。Task执行用户代码时所使用的堆外内存。


       配置参数:taskmanager.memory.task.off-heap.size = 0,默认为0。


               c. 网络缓冲内存Network Memory。网络数据交换所使用的堆外加内存大小,如网络数据交换缓冲区(Network Buffer)。


       配置参数:taskmanager.memory.network.[64/1024/0.1](min/max/fraction),默认 min = 64MB,max = 1gb,fraction = 0.1。


               d. 堆外托管内存 Manged Memory。Flink管理的堆外内存。


       配置参数:taskmanager.memory.managed.[size][fraction],默认fraction = 0.4。

2. JVM本身使用的内存

JVM本身直接使用了操作系统的内存。


1. JVM元空间


JVM 元空间所使用的的内存。


配置参数:taskmanager.memory.jvm-metaspace = 96m, 默认96MB。


2. JVM执行开销


JVM 在执行时自身所需要的的内容,包括线程堆栈、IO、编译缓存等所使用的的内存。


配置参数:taskmanager.memory.jvm-overhead = [min/max/faction]。默认min=192MB,max=1GB,fraction = 0.1。

3. 总体内存

1.Flink使用内存


 综上而言,Flink使用的内存包括Flink使用的堆上、堆外内存。使用参数taskmanager.memory.flink.size进行控制。


2. 进程使用内存


 整个进程所使用的内存,包括Flink使用的内存和JVM使用的内存。使用参数taskmanager.memory.process.size进行控制。


JVM内存控制参数如下:


1)JVM堆上内存,使用-Xmx 和 -Xms 参数进行控制


2)JVM直接内存,使用参数-XX:MaxDirectMemorySize 进行控制。对于托管内存,使用Unsafe.allocateMemory()申请,不受改参数控制。


3)JVM Metaspace 使用-XX:MaxMetaspaceSize进行控制。


二、内存数据结构

2.1 内存段

内存段在Flink内部叫作MemorySegment,是Flink的内存抽象的最小分配单元。默认情况下,一个MemorySegment 对应一个32kb大小的内存块。这块内存既可以是堆上内存(Java的byte数组),也可以是堆外内存(基于Netty的DirecByteBuffer)。


       MemorySegment同时也提供了对二进制数据进行读取和写入的方法。对于Java基本数据类型,如short、int、long等,MemorySegment 内置了方法,可以直接返回或者写入数据,对于其他类型,读取二进制数组byte[]后进行反序列化,序列化为二进制数组byte[]后写入。


1.MemorySegment结构


1)BYTE_ARRAY_BASE_OFFSET:二进制字节数组的起始索引,相对于字节数组对象而言。


2)LITTLE_ENDIAN:判断是否为Little Endian模式的字节存储顺序,若不是,就是Big Endian模式。


3)HeapMemory:如果MemorySegment使用堆上内存,则表示一个堆上的字节数组(byte[]),如果MemorySegment使用对外内存,则为null。


4)address:字节数组对应的相对地址(若HeapMemory为null,即可能为对外内存的绝对地址)。


5)addressLimit:标识地址结束位置(address + size)。


6)size:内存段的字节数。


2.字节顺序Big Endian与Little Endian

字节顺序是指占内存多于一个字节类型的数据在内存中的存放顺序,不同的CPU架构体系使用不同的存储顺序。PowerPC系列采用Big Endian方式存储数据,低地址存放最高有效字节(MSB 高位编址),而x86 系列则采用Little Endian方式存储数据,低地址存放最低有效字节(LSB 低位编址)

640.png


3. MemorySegment实现

Flink的MemorySegment有堆上和堆外两种实现:


640.png


HeapMemorySegment 用来分配堆上内存,HybridMemorySegment用来分配堆外内存和堆上内存。实际上在2017年之后的Flink中,并没有使用HeapMemorySegment,而是使用 HybridMemorySegment这个类来同时实现堆上和堆外内存的分配。


2.2 内存页

内存页是MemorySegment之上的数据访问视图,数据读取抽象为DataInputView,数据写入抽象为DataOutputView。有了这一层,上层使用者无须关心MemorySegment的细节,该层会自动处理跨MemorySegment的读取和写入。

1.DataInputView


640.png


DataInputView 是从MemorySegment数据读取抽象视图,继承自java.io.DataInput.InputView中持有多个MemorySegment的引用(MemorySegment[]),这一组·MemorySegment被视为一个内存页(Page),可以顺序读取MemorySegment中的数据。


2.DataOutputView


640.png


DataOutputView 是从MemorySegment数据读取抽象视图,继承自java.io.DataOutput.OutputView中持有多个MemorySegment的引用(MemorySegment[]),这一组·MemorySegment被视为一个内存页(Page),可以顺序地向MemorySegment中写入数据。


2.3 Buffer

Task算子处理数据完毕,将结果交给下游的时候,使用的抽象或者说内存对象是Buffer。Buffer的接口是网络层面上传输数据和事件的统一抽象,其实现类是NetworkBuffer。Flink在各个TaskManger之间传递数据时,使用的是这一层的抽象。1个NetworkBuffer中包装了一个MemorySegment。


3d6854637a24c184d4aadc5ee841465e.png


Buffer 的底层是MemorySegment,Buffer申请和释放由Flink自行管理,Flink引入了引用数的概念。当有新的Buffer消费者时,引用数加1,当消费者消费完Buffer时,引用数减1,最终当引用数变为0时,就可以将Buffer释放重用了。


AbstractReferenceCountedByteBuf是Netty的抽象类,通过继承该类,Flink中的buffer 具备了引用计数的能力,并且实现了对MemorySegment的读写。


2.4 Buffer资源池

   Buffer资源池在Flink中叫作BufferPool。BufferPool用来管理Buffer,包含Buffer的申请、释放、销毁、可用Buffer的通知等,其实现类是LocalBufferPool,每个Task拥有自己的LocalBufferPool。


BufferPool的类体系如下:

e10686192df720b2b0c8b1f8c0117e29.png


为了方便对BufferPool的管理,Flink设计了BufferPoolFactory,提供了BufferPool的创建和销毁,其唯一的实现类是NetworkBufferPool。


每个TaskManager只有一个NetworkBufferPool,同一个TaskManager上的Task共享NetworkBufferPool,在TaskManager启动的时候,就会创建NetworkBufferpool,为其分配内存。


NetworkBufferPool持有该TaskManager在进行数据传递时所能够使用的所有内存,所以其除了作为BufferPool的工厂外,还作为Task所需内存段(MemorySegment)的提供者,每个Task的LocalBufferPool所需要的内存都是从NetworkBufferPool申请而来的。


三、内存管理器

3.1 内存申请

批处理计算任务中,MemoryManager负责为算子申请堆外内存。最终实际申请的是堆外的ByteBuffer。

#MemorySegmentFactory.java

64c3a1445cd7b39b4a115bf7c4156a9a.png

流计算任务中,MemoryManager更多的作用是管理,控制RocksDB的内存使用量,通过RocksDB的Block Cache 和WriteBufferManager参数来限制,参数的具体值从TaskManger的内存配置参数中计算来。RocksDb自己来负责运行过程中的内存申请和内存释放。


#MemoryManager.java


b08131f6a3016b208fd8070552bd673c.png


3.2 内存释放

Flink自行管理内存,也就意味着内存的申请和释放都由Flink来负责。触发Java堆外内存释放的行为一般有如下两种。


a.内存使用完毕


b.Task停止(正常或异常)执行。


在Flink中实现了一个JavaGcCleanerWrapper来进行堆外内存的释放,提供了两个JavaCleaner。


1)LegacyCleanerProvider


该CleanerProvider提供1.8及以下版本JDK的Flink管理的内存的垃圾回收,使用sun.misc.Cleaner来释放内存。


2)Java9CleanerProvider


该CleanerProvider提供1.9及以上版本JDK的Flink管理的内存的垃圾回收,使用java.lang.ref.Cleaner来释放内存。


JavaGcCleanerWrapper会为每一个Owner创建一个包含Cleaner的Runnable对象,在每个MemorySegment释放内存的时候,调用此Cleaner进行内存的释放。


四、网络缓冲器

网络缓冲器(NetworkBuffer)是网络交换数据的包装,其对应于MemorySegment内存段,当结果分区(ResultPartition)开始写出数据的时候,需要向LocalBufferPool申请Buffer资源,使用BufferBuilder将数据写入MemorySegment。当MemorySegment都分配完后,则会持续等待Buffer的释放。


8a3a7a4e776ba31c796b173bb797e552.png


BufferBuilder在上游Task中,用来向申请到的MemorySegment写入数据。与BufferBuilder相对的是BufferConsumer,BufferConsumer位于下游Task中,负责从MemorySegment中读取数据,1个BufferBuilder对应一个BufferConsumer。

4.1 内存申请

LocalBufferPool的大小是动态的,在最小内存段数量与最大内存段数量之间浮动。使用NetworkBufferPool创建LocalBufferPool时,如果该TaskManager的内存无法满足所有Task所需的最小MemorySegment的数据总和,则会发生错误。

1.buffer申请

结果分区(ResultPartition)申请Buffer进行数据写入。

6f9d3c522a6e603fd9b9f103f1d73385.png

LocalBufferPool 首先从自身持有的MemorySegment中分配可用的,如果没有可用的,则从TaskManager的NetworkBufferPool申请,如果没有,则阻塞等待可用的MemorySegment。

a25cbf65ca450d1d278605d0ed3f51f6.png

2.MemorySegment申请

申请Buffer本质上来说就是申请MemorySegment,如果在LocalBufferPool中,则申请新的堆外内存MemorySegment。

06746a242a89ef9cd55d4dab0708d28b.png

4.2 内存回收

Buffer使用了引用计数机制来判断什么时候可以释放Buffer到可用资源池。每创建一个BufferConsumer,就会对Buffer的引用计数+1,每个Buffer被消费完,就会对Buffer的引用计数-1,当Buffer引用计数为0的时候就可以回收了。

1.Buffer回收

80e25b43571d2205fb6c89da99a34d05.png

Buffer回收之后,并不会释放MemorySegment,此时MemorySegment仍然在LocalBufferPool的资源池中,除非TaskManager级别内存不足,才会释放回TaskManager持有的全局资源池。


释放MemorySegment的时候,同样要根据MemorySegment的类型来进行,并且要在不低于保留内存的情况下,将内存释放回内存段中 ,变为可用内存,后续申请MemorySegment的时候,可以重复利用该内存片段。

2.MemorySegment释放

当NetworkBufferPool关闭的时候进行内存的释放,交还给操作系统。


接下来Flink状态原理篇,如果对Flink感兴趣或者正在使用的小伙伴,可以加我入群一起探讨学习。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
5月前
|
关系型数据库 MySQL 数据库
实时计算 Flink版操作报错合集之网络缓冲池(NetworkBufferPool)中可用内存不足,该如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
6月前
|
消息中间件 存储 Kafka
实时计算 Flink版产品使用问题之 从Kafka读取数据,并与两个仅在任务启动时读取一次的维度表进行内连接(inner join)时,如果没有匹配到的数据会被直接丢弃还是会被存储在内存中
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
缓存 监控 NoSQL
阿里面试让聊一聊Redis 的内存淘汰(驱逐)策略
大家好,我是 V 哥。粉丝小 A 面试阿里时被问到 Redis 的内存淘汰策略问题,特此整理了一份详细笔记供参考。Redis 的内存淘汰策略决定了在内存达到上限时如何移除数据。希望这份笔记对你有所帮助!欢迎关注“威哥爱编程”,一起学习与成长。
|
2月前
|
存储 Kubernetes 架构师
阿里面试:JVM 锁内存 是怎么变化的? JVM 锁的膨胀过程 ?
尼恩,一位经验丰富的40岁老架构师,通过其读者交流群分享了一系列关于JVM锁的深度解析,包括偏向锁、轻量级锁、自旋锁和重量级锁的概念、内存结构变化及锁膨胀流程。这些内容不仅帮助群内的小伙伴们顺利通过了多家一线互联网企业的面试,还整理成了《尼恩Java面试宝典》等技术资料,助力更多开发者提升技术水平,实现职业逆袭。尼恩强调,掌握这些核心知识点不仅能提高面试成功率,还能在实际工作中更好地应对高并发场景下的性能优化问题。
|
4月前
|
NoSQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之全量同步的内存释放该怎么实现
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
4月前
|
SQL 存储 关系型数据库
实时计算 Flink版产品使用问题之同步MySQL多张表的过程中,内存释放依赖于什么
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
存储 缓存 监控
Flink内存管理机制及其参数调优
Flink内存管理机制及其参数调优
|
5月前
|
SQL Java 调度
实时计算 Flink版产品使用问题之使用Spring Boot启动Flink处理任务时,使用Spring Boot的@Scheduled注解进行定时任务调度,出现内存占用过高,该怎么办
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
缓存 关系型数据库 MySQL
实时计算 Flink版产品使用问题之缓存内存占用较大一般是什么导致的
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
资源调度 Java 关系型数据库
实时计算 Flink版产品使用问题之如何解决内存占用过大的问题
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。

热门文章

最新文章