Flink TaskManager 内存模型详解

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink 通过严格控制其各个组件的内存使用,在 JVM 之上提供了高效的工作负载。虽然社区努力为所有配置提供合理的默认值,但不可能适用于用户在 Flink 上部署的所有应用程序。为了向我们的用户提供最大的生产价值,Flink 允许在集群内进行高级和细粒度的内存分配调优。

Apache Flink 通过严格控制其各个组件的内存使用,在 JVM 之上提供了高效的工作负载。虽然社区努力为所有配置提供合理的默认值,但不可能适用于用户在 Flink 上部署的所有应用程序。为了向我们的用户提供最大的生产价值,Flink 允许在集群内进行高级和细粒度的内存分配调优。


我们都知道不管 Flink 运行在什么集群上,真正干活的都是 TaskManager (后面简称为 TM),JobManager (后面简称为 JM)只负责任务的调度,所以了解 TM 的内存模型是非常有必要的,今天这篇文章就来说一下 TM 的内存模型,JM 的内存模型相对简单,这里就不再说了.


在 Flink 1.10.0 版本中,社区对 TM 的内存模型做了进一步的改进和升级,虽然内存的划分已经很明确,但还是让人看的眼花缭乱,容易混淆.先来看下面这张图.


Flink  TM 内存模型



image-20210403163557657

在 Flink 1.12.0 版本中对 UI 进行了改进,在 TM 的页面增加了一个内存模型图,清楚的显示了每个区域的内存配置以及使用情况.


Total Process Memory (进程总内存) 包含了 Flink 应用程序使用的全部内存资源:Total Flink Memory (Flink应用使用的内存) + 运行 Flink JVM 使用的内存。Total Process Memory 对应 Yarn/Mesos 等容器化部署模式(需要用户指定),相当于申请容器的大小,Total Flink Memory 对应 standalone 部署模式(需要用户指定)。
Total Flink Memory 内部分成了:堆内内存 + 堆外内存:
堆内内存包括两部分:FreameWork Heap Memory (框架堆内存) + Task Heap Memory (任务堆内存)
堆外内存包含三部分:Managed Memory (托管内存) + Framework Off-heap Memory (框架堆外内存) + Network Memory (网络内存)

下面就按照上图中编号顺序分别介绍一下这些内存的作用以及如何配置


Framework Heap


含义描述


Flink 框架本身占用的内存,这部分的内存一般情况下是不需要修改的,在特殊的情况下可能需要调整.


参数设置


taskmanager.memory.framework.heap.size:堆内部分(Framework Heap),默认值 128M;


taskmanager.memory.framework.off-heap.size:堆外部分(Framework Off-Heap),以直接内存形式分配,默认值 128M。


Task Heap


含义描述


用于 Flink 应用的算子及用户代码占用的内存。


参数设置


taskmanager.memory.task.heap.size:堆内部分(Task Heap),无默认值,一般不建议设置,会自动用 Flink 总内存减去框架、托管、网络三部分的内存推算得出。


taskmanager.memory.task.off-heap.size:堆外部分(Task Off-Heap),以直接内存形式分配,默认值为 0,即不使用。如果代码中需要调用 Native Method 并分配堆外内存,可以指定该参数。一般不使用,所以大多数时候可以保持0。


Managed Memory


含义描述


纯堆外内存,由 MemoryManager 管理,用于中间结果缓存、排序、哈希表等,以及 RocksDB 状态后端。可见,RocksDB 消耗的内存可以由用户显式控制了,不再像旧版本一样难以预测和调节。


参数设置


taskmanager.memory.managed.fraction:托管内存占 Flink 总内存 taskmanager.memory.flink.size 的比例,默认值 0.4;


taskmanager.memory.managed.size:托管内存的大小,无默认值,一般也不指定,而是依照上述比例来推定,更加灵活。


Network


含义描述


Network Memory 使用的是 Directory memory,在 Task 与 Task 之间进行数据交换时(shuffle),需要将数据缓存下来,缓存能够使用的内存大小就是这个 Network Memory。它是由三个参数决定.


参数设置


taskmanager.memory.network.min:网络缓存的最小值,默认 64MB;


taskmanager.memory.network.max:网络缓存的最大值,默认 1GB;


taskmanager.memory.network.fraction:网络缓存占 Flink 总内存 taskmanager.memory.flink.size 的比例,默认值 0.1。若根据此比例算出的内存量比最小值小或比最大值大,就会限制到最小值或者最大值。


JVM Metaspace


含义描述


从 JDK 8 开始,JVM 把永久代拿掉了。类的一些元数据放在叫做 Metaspace 的 Native Memory 中。在 Flink 中的 JVM Metaspace Memory 也一样,它配置的是 Task Manager JVM 的元空间内存大小。


参数设置


taskmanager.memory.jvm-metaspace.size:默认值 256MB。


JVM Overhead


含义描述


保留给 JVM 其他的内存开销。例如:Thread Stack、code cache、GC 回收空间等等。和 Network Memory 的配置方法类似。它也由三个配置决定


参数设置


taskmanager.memory.jvm-overhead.min:JVM 额外开销的最小值,默认 192MB;


taskmanager.memory.jvm-overhead.max:JVM 额外开销的最大值,默认 1GB;


taskmanager.memory.jvm-overhead.fraction:JVM 额外开销占 TM 进程总内存


taskmanager.memory.process.size(注意不是 Flink 总内存)的比例,默认值 0.1。若根据此比例算出的内存量比最小值小或比最大值大,就会限制到最小值或者最大值。


我们再来看一下 TM 启动日志里面内存相关的配置信息如下:


INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -  Program Arguments:
 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     -D
 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     taskmanager.memory.framework.off-heap.size=134217728b(128M)
 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     -D
 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     taskmanager.memory.network.max=214748368b(204.8M)
 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     -D
 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     taskmanager.memory.network.min=214748368b(204.8M)
 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     -D
 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     taskmanager.memory.framework.heap.size=134217728b(128M)
 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     -D
 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     taskmanager.memory.managed.size=858993472b(819.2M)
 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     -D
 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     taskmanager.cpu.cores=4.0
 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     -D
 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     taskmanager.memory.task.heap.size=805306352b(767.9M)
 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     -D
 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] -     taskmanager.memory.task.off-heap.size=0b


这个内存就是上图中 Configured Values 显示的值,我们把这些值加起来

taskmanager.memory.framework.off-heap.size + taskmanager.memory.network.max + taskmanager.memory.framework.heap.size + taskmanager.memory.managed.size + taskmanager.memory.task.heap.size = 128 + 205 + 128 + 819 + 768 = taskmanager.memory.flink.size = 2048M 这个结果跟我们在 flink-conf.yaml 中的配置是能对上的.


总结


上面这么多的内存,到底应该怎么配置呢?首先官网不建议同时设置进程总内存和 Flink 总内存。这可能会造成内存配置冲突,从而导致部署失败。额外配置其他内存部分时,同样需要注意可能产生的配置冲突。


其实 taskmanager.memory.framework.heap.size ,taskmanager.memory.framework.off-heap.size,JVM Metaspace,JVM Overhead 这几个参数一般情况下是不需要配置的,走默认值就可以了.我们主要关注的是 Task Heap、Managed Memory、Network 这几部分的内存,当然 Flink 本身也会计算出这 3 部分的内存,我们自己也需要根据任务的特点,比如流量大小,状态大小等去调整.

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
13天前
|
消息中间件 存储 Kafka
实时计算 Flink版产品使用问题之 从Kafka读取数据,并与两个仅在任务启动时读取一次的维度表进行内连接(inner join)时,如果没有匹配到的数据会被直接丢弃还是会被存储在内存中
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
资源调度 关系型数据库 数据库
实时计算 Flink版产品使用合集之flink-cdc.sh xx.yaml提交到yarn 发现没有启动task manager的,怎么处理
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
13天前
|
SQL 存储 资源调度
实时计算 Flink版产品使用问题之如何对搭建的集群的taskmanager数量进行扩容或缩容
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
12天前
|
资源调度 分布式计算 Oracle
实时计算 Flink版操作报错合集之flink on yarn job manager 可以启动, 但不给分配slot ,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
13天前
|
SQL Kubernetes 数据处理
实时计算 Flink版产品使用问题之在 flink-conf.yaml 中定义的配置在某些情况下未被正确应用到 K8s 上运行的任务管理器(JobManager)和任务管理节点(TaskManager),是什么导致的
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
消息中间件 SQL Java
实时计算 Flink版产品使用合集之管理内存webui上一直是百分百是什么导致的
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
关系型数据库 测试技术 数据处理
实时计算 Flink版产品使用合集之TaskManager宕机是什么原因
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5天前
|
存储 Java C++
Java虚拟机(JVM)管理内存划分为多个区域:程序计数器记录线程执行位置;虚拟机栈存储线程私有数据
Java虚拟机(JVM)管理内存划分为多个区域:程序计数器记录线程执行位置;虚拟机栈存储线程私有数据,如局部变量和操作数;本地方法栈支持native方法;堆存放所有线程的对象实例,由垃圾回收管理;方法区(在Java 8后变为元空间)存储类信息和常量;运行时常量池是方法区一部分,保存符号引用和常量;直接内存非JVM规范定义,手动管理,通过Buffer类使用。Java 8后,永久代被元空间取代,G1成为默认GC。
16 2
|
9天前
|
存储
数据在内存中的存储(2)
数据在内存中的存储(2)
22 5
|
9天前
|
存储 小程序 编译器
数据在内存中的存储(1)
数据在内存中的存储(1)
27 5