Flink内存管理源码解读之内存管理器

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 回顾 上一篇文章我们谈了Flink自主内存管理的一些基础的数据结构。那篇中主要讲了数据结构的定义,这篇我们来看看那些数据结构的使用,以及内存的管理设计。 概述 这篇文章我们主要探讨Flink的内存管理类MemoryManager涉及到对内存的分配、回收,以及针对预分配内存而提供的memory segment pool。

回顾

上一篇文章我们谈了Flink自主内存管理的一些基础的数据结构。那篇中主要讲了数据结构的定义,这篇我们来看看那些数据结构的使用,以及内存的管理设计。

概述

这篇文章我们主要探讨Flink的内存管理类MemoryManager涉及到对内存的分配、回收,以及针对预分配内存而提供的memory segment pool。还有支持跨越多个memory segment数据访问的page view。

本文探讨的类主要位于pageckage : org.apache.flink.runtime.memory下。完整类图:

flink-source-code-analysis-memory-management-1_all-class-diagram

MemoryManager

MemoryManager 作为Flink的内存管理器,承担着MemorySegment的分配、回收等职责。

为了提升MemorySegment的复用能力,它提供了不同memory type的MemorySegment池的实现,它们是:

  • HeapMemoryPool
  • HybirdOffHeapMemoryPool

首先,这里为了提升memory segment操作效率,MemoryManager鼓励长度相等的memory segment。由此引入了page的概念。其实page跟memory segment没有本质上的区别,只不过是为了体现memory segment被分配为均等大小的内存空间而引入的。可以将这个类比于操作系统的页式内存分配,page这里看着同等大小的block即可。MemoryManager提供的默认page size为32KB,并提供了自定义page size的下界值不得小于4KB。

    /** The default memory page size. Currently set to 32 KiBytes. */
    public static final int DEFAULT_PAGE_SIZE = 32 * 1024;

    /** The minimal memory page size. Currently set to 4 KiBytes. */
    public static final int MIN_PAGE_SIZE = 4 * 1024;

MemoryManager允许自定义page size,它提供的构造器可以指定这个参数,一个MemoryManager应对一个page size,而且指定了就不允许改变。

page化的segment跟非page化的segment:

flink-source-code-analysis-memory-management-1_page-and-non-page

将segment page化会给后面的跨多个segment的访问带来更高的效率。

MemoryManager这个类本身没有特别的地方,并可能会被跨线程共享。这时某些操作可能会牵扯到多线程的并发问题。因此,MemoryManager提供了一个对象作为锁,以在某些方法上进行同步操作。

    /** The lock used on the shared structures. */
    private final Object lock = new Object();

两个构造器:

    public MemoryManager(long memorySize, int numberOfSlots) {
    public MemoryManager(long memorySize, int numberOfSlots, int pageSize, MemoryType memoryType, boolean preAllocateMemory) {

第一个构造器内部调用了第二个构造器,并用DEFAULT_PAGE_SIZE作为pageSize,HEAP作为memory type。

需要提及一下的是,这里的参数numberOfSlots 是跟Flink的task manager相关,暂时不做过多介绍,等到我们讲解Flink runtime时再细说。

第二个构造器的另一个参数preAllocateMemory,指定memory manager的内存分配策略是预分配还是按需分配。我们后面会看到,对于这两种策略,相关的内存申请和释放操作是不同的。

第二个构造器内就已经根据memory type将特定的memory pool对象初始化好了:

        switch (memoryType) {
            case HEAP:
                this.memoryPool = new HeapMemoryPool(memToAllocate, pageSize);
                break;
            case OFF_HEAP:
                this.memoryPool = new HybridOffHeapMemoryPool(memToAllocate, pageSize);
                break;
            default:
                throw new IllegalArgumentException("unrecognized memory type: " + memoryType);
        }

通过定位到两个pool对象的构造器,可以看到在实例化构造器的时候就已经将需要预分配的内存分配到位了(当然,这里是针对preAllocateMemory为true的调用情景而言),因为如果该参数为false,那么pool构造器的memToAllocate将会被置为0。

        public HeapMemoryPool(int numInitialSegments, int segmentSize) {
            this.availableMemory = new ArrayDeque<byte[]>(numInitialSegments);
            this.segmentSize = segmentSize;

            for (int i = 0; i < numInitialSegments; i++) {
                this.availableMemory.add(new byte[segmentSize]);
            }
        }
        public HybridOffHeapMemoryPool(int numInitialSegments, int segmentSize) {
            this.availableMemory = new ArrayDeque<ByteBuffer>(numInitialSegments);
            this.segmentSize = segmentSize;

            for (int i = 0; i < numInitialSegments; i++) {
                this.availableMemory.add(ByteBuffer.allocateDirect(segmentSize));
            }
        }

针对offheap的内存池对象,分配内存调用的是java SE的API

public static ByteBuffer allocateDirect(int capacity);

该API用于分配一个新的direct byte buffer,大小限制即为capacity,其他的一切都是不确定的(包括它内部是否是一个byte数组)。

MemoryManager并没有类似于open这样的方法,它的构造器已经包含了这些动作。但它有明确的shutdown方法,主要的作用是:释放内存,清空内存池。

    public void shutdown() {
        // -------------------- BEGIN CRITICAL SECTION -------------------
        synchronized (lock)
        {
            if (!isShutDown) {
                // mark as shutdown and release memory
                isShutDown = true;
                numNonAllocatedPages = 0;

                // go over all allocated segments and release them
                for (Set<MemorySegment> segments : allocatedSegments.values()) {
                    for (MemorySegment seg : segments) {
                        seg.free();
                    }
                }

                memoryPool.clear();
            }
        }
        // -------------------- END CRITICAL SECTION -------------------
    }

下面进入到allocate以及release方法,这里因为篇幅的关系,我们只关注核心逻辑。这两个方法都共同拥有一个参数owner,说白了就是一个映射关系,谁申请的memory segment,将会挂到谁的名下,释放的时候也从谁的名下删除。大致如下图所示:

flink-source-code-analysis-memory-management-1_allocated-segment

allocate:

        synchronized (lock)
        {

            Set<MemorySegment> segmentsForOwner = allocatedSegments.get(owner);
            if (segmentsForOwner == null) {
                segmentsForOwner = new HashSet<MemorySegment>(numPages);
                allocatedSegments.put(owner, segmentsForOwner);
            }

            if (isPreAllocated) {
                for (int i = numPages; i > 0; i--) {
                    MemorySegment segment = memoryPool.requestSegmentFromPool(owner);
                    target.add(segment);
                    segmentsForOwner.add(segment);
                }
            }
            else {
                for (int i = numPages; i > 0; i--) {
                    MemorySegment segment = memoryPool.allocateNewSegment(owner);
                    target.add(segment);
                    segmentsForOwner.add(segment);
                }
                numNonAllocatedPages -= numPages;
            }
        }

上面的预防性检查就不看了,主要逻辑有三段。先获取该owner下的segment集合,如何不存在,则初始化;然后如果是预分配的模式,则直接从池里取出来,加入到当前owner申请的segment集合中;如果不是预分配模式,则立即分配。

release分为单个memory segment释放以及多segment一起释放。

先来看单个memory segment释放:

synchronized (lock)
        {
            // remove the reference in the map for the owner
            try {
                Set<MemorySegment> segsForOwner = this.allocatedSegments.get(owner);

                if (segsForOwner != null) {
                    segsForOwner.remove(segment);
                    if (segsForOwner.isEmpty()) {
                        this.allocatedSegments.remove(owner);
                    }
                }

                if (isPreAllocated) {
                    // release the memory in any case
                    memoryPool.returnSegmentToPool(segment);
                }
                else {
                    segment.free();
                    numNonAllocatedPages++;
                }
            }
            catch (Throwable t) {
                throw new RuntimeException("Error removing book-keeping reference to allocated memory segment.", t);
            }
        }

基本和allocate是相反的逻辑,不过有一点要注意的是,如果当前释放的segment是segsForOwner集合中的最后一个,那么将segsForOwner也从allocatedSegments中移除。

释放多个memory segment:

        synchronized (lock)
        {
            // since concurrent modifications to the collection
            // can disturb the release, we need to try potentially multiple times
            boolean successfullyReleased = false;
            do {
                final Iterator<MemorySegment> segmentsIterator = segments.iterator();

                Object lastOwner = null;
                Set<MemorySegment> segsForOwner = null;

                try {
                    // go over all segments
                    while (segmentsIterator.hasNext()) {

                        final MemorySegment seg = segmentsIterator.next();
                        if (seg == null || seg.isFreed()) {
                            continue;
                        }

                        final Object owner = seg.getOwner();

                        try {
                            // get the list of segments by this owner only if it is a different owner than for
                            // the previous one (or it is the first segment)
                            if (lastOwner != owner) {
                                lastOwner = owner;
                                segsForOwner = this.allocatedSegments.get(owner);
                            }

                            // remove the segment from the list
                            if (segsForOwner != null) {
                                segsForOwner.remove(seg);
                                if (segsForOwner.isEmpty()) {
                                    this.allocatedSegments.remove(owner);
                                }
                            }

                            if (isPreAllocated) {
                                memoryPool.returnSegmentToPool(seg);
                            }
                            else {
                                seg.free();
                                numNonAllocatedPages++;
                            }
                        }
                        catch (Throwable t) {
                            throw new RuntimeException(
                                    "Error removing book-keeping reference to allocated memory segment.", t);
                        }
                    }

                    segments.clear();

                    // the only way to exit the loop
                    successfullyReleased = true;
                }
                catch (ConcurrentModificationException e) {
                    // this may happen in the case where an asynchronous
                    // call releases the memory. fall through the loop and try again
                }
            } while (!successfullyReleased);
        }

可以看到,核心逻辑跟单个segment的释放并没有太大的不同,但最外层套了个do/while循环,用于在释放失败之后不断重试,直到释放成功为止。这里可能涉及到对正在释放的segments的并发修改产生ConcurrentModificationException异常而失败。

还有一个基于owner的释放方法:

    public void releaseAll(Object owner) {

逻辑也是大同小异,不再废话。

以上,MemoryManager的绝大部分方法都介绍完毕,逻辑相对还是比较清晰的。下面简单介绍一下几个内部的Pool,上面也已经提过了。

MemoryPool

MemoryPool是真正跟MemorySegment打交道的地方,涉及到申请、分配、回收等操作。它提供了两个实现类,两个pool的实现都采用JDK的ArrayDeque(双端队列)作为存储所有内存块引用的数据结构。

对于内存块的类型,HeapMemoryPool所代表的on-heap内存使用byte[]表示,而HybridOffHeapMemoryPool所代表的off-heap内存使用ByteBuffer表示。

注意这里的分类方式不同于我们讲基础数据结构时的HeapMemorySegement以及HybirdMemorySegment划分方式,这里为了表示内存块的引用,直接区分了on-heap和off-heap,没有所谓的hybird memory segment。

关于pool对象,没有太多特别的,主要是三个接口方法:

  • allocateNewSegment : 直接申请一个新的segment,该segment不属于pool
  • requestSegmentFromPool : 从池里请求一个segment,调用ArrayDeque的remove(首元素出队)方法,获得一个内存块引用,并包装成segment
  • returnSegementToPool : 调用ArrayDeque的add(入队到尾部)方法,添加内存块引用并调用MemorySegmentfree方法释放该segement

AbstractPagedXXXView

还记得我们在谈Flink内存管理基础数据结构时谈到DataInputViewDataOutputView。这个包就提供了基于page的对view的进一步实现(并非完整实现,从名字也能看出,它只是个抽象类)。

说得更直白一点就是,它提供了跨越多个memory page的数据访问(input/output)视图。它包含了从page中读取/写入数据的解码/编码方法以及跨越page的边界检查(边界检查主要由实现类来完成)。

说说两个View类都有的几个字段:

  • currentSegment : 表示当前正在操作的memory segment
  • headerLength : 每个memory segment前面有一段是头部,可能存储一些元数据信息,数据访问的时候需要跳过这个长度,要求这个pageview指代的所有memory segment的header length都相等
  • positionInSegment : 类似于一个指针,指向某个segment的某个位置(相对segment的位置)

AbstractPagedInputView

  • limitInSegment : 指定跟下一个segment的界限值

AbstractPagedOutputView

  • segmentSize : 指定所有segment size的值,也即一个page view里所有的segment长度都是一样的。

两个view类都有一些共同的方法,其中的一个是advance,它用于从当前segment切换到下一个segment。

其他的一些方法都是数据输入/输出相关的特定方法。

我们看一个最核心的方法,AbstractPagedOutputView的write方法:

    public void write(byte[] b, int off, int len) throws IOException {
        int remaining = this.segmentSize - this.positionInSegment;
        if (remaining >= len) {
            this.currentSegment.put(this.positionInSegment, b, off, len);
            this.positionInSegment += len;
        }
        else {
            if (remaining == 0) {
                advance();
                remaining = this.segmentSize - this.positionInSegment;
            }
            while (true) {
                int toPut = Math.min(remaining, len);
                this.currentSegment.put(this.positionInSegment, b, off, toPut);
                off += toPut;
                len -= toPut;

                if (len > 0) {
                    this.positionInSegment = this.segmentSize;
                    advance();
                    remaining = this.segmentSize - this.positionInSegment;  
                }
                else {
                    this.positionInSegment += toPut;
                    break;
                }
            }
        }
    }

这就是一个跨越多个segment的写数据的核心方法,我们大致分析一下逻辑。

前提说明:remaining这个变量始终表示,currentSegment剩余的内存。

首先,判断remaining跟len的大小关系,如果

remaining >= len

即剩余空间是足够写入len长度的数据的,那么就直接写入数据,将positionInSegment后移len长度

否则,表明currentSegment剩余空间不够。首先判断一个临界点,如果没有剩余空间了:

remaining == 0

则直接切换到下一个segment,并重新计算remaining。然后进入一个循环:判断,切换segment,计算剩余空间,移动指针。大致是:

remaininglen之间取较小值,作为待写入的长度。然后将toPut长度的数据写入currentSegemnt。然后off指针后移toPut长度(注意,off是指向待写入数据b数组的);同时写入的数据长度len减去已写入的toPut长度。

接着判断,如果是remainng 小于 len:

len > 0

那么说明还是要切换到下一个segment(currentSegment 不够存)。然后重新计算新的currentSegment重新进入循环流程。

否则,说明remainng大于len,说明数据已经足够存储了,则移动positionInSegment指针,然后跳出循环。

那么AbstractPagedInputView的read方法也就不难理解了的。

总结

本文分析了Flink的内存管理机制。总得来说有几点是其核心思路:

  • 构建segment对象池(pool化),增强复用性,减少重复分配,回收的开销
  • 规范segment的大小(page化),提升操作效率
  • 抽象跨segment的访问复杂性(view化)

原文发布时间为:2016-04-06
本文作者:vinoYang
本文来自云栖社区合作伙伴 CSDN博客,了解相关信息可以关注CSDN博客。
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
18天前
|
缓存 算法 Java
本文聚焦于Java内存管理与调优,介绍Java内存模型、内存泄漏检测与预防、高效字符串拼接、数据结构优化及垃圾回收机制
在现代软件开发中,性能优化至关重要。本文聚焦于Java内存管理与调优,介绍Java内存模型、内存泄漏检测与预防、高效字符串拼接、数据结构优化及垃圾回收机制。通过调整垃圾回收器参数、优化堆大小与布局、使用对象池和缓存技术,开发者可显著提升应用性能和稳定性。
36 6
|
2月前
|
存储 程序员 编译器
C语言——动态内存管理与内存操作函数
C语言——动态内存管理与内存操作函数
|
2月前
|
存储 缓存 监控
深入了解MySQL内存管理:如何查看MySQL使用的内存
深入了解MySQL内存管理:如何查看MySQL使用的内存
336 1
|
2月前
|
存储 安全 程序员
【C++篇】深入内存迷宫:C/C++ 高效内存管理全揭秘
【C++篇】深入内存迷宫:C/C++ 高效内存管理全揭秘
65 3
|
3月前
|
Java
在 ArkTS 中,如何有效地进行内存管理和避免内存泄漏?
【9月更文挑战第25天】在ArkTS中,有效进行内存管理并避免内存泄漏的方法包括:及时释放不再使用的资源,如关闭监听器和清理定时器;避免循环引用,通过弱引用打破循环;合理使用单例模式,确保单例对象正确释放;及时处理不再使用的页面和组件,在卸载时清理相关资源。
103 9
|
2月前
|
Java C语言 iOS开发
MacOS环境-手写操作系统-16-内存管理 解析内存状态
MacOS环境-手写操作系统-16-内存管理 解析内存状态
40 0
|
3月前
|
监控 Java 大数据
【Java内存管理新突破】JDK 22:细粒度内存管理API,精准控制每一块内存!
【9月更文挑战第9天】虽然目前JDK 22的确切内容尚未公布,但我们可以根据Java语言的发展趋势和社区的需求,预测细粒度内存管理API可能成为未来Java内存管理领域的新突破。这套API将为开发者提供前所未有的内存控制能力,助力Java应用在更多领域发挥更大作用。我们期待JDK 22的发布,期待Java语言在内存管理领域的持续创新和发展。
|
3月前
|
存储 并行计算 算法
CUDA统一内存:简化GPU编程的内存管理
在GPU编程中,内存管理是关键挑战之一。NVIDIA CUDA 6.0引入了统一内存,简化了CPU与GPU之间的数据传输。统一内存允许在单个地址空间内分配可被两者访问的内存,自动迁移数据,从而简化内存管理、提高性能并增强代码可扩展性。本文将详细介绍统一内存的工作原理、优势及其使用方法,帮助开发者更高效地开发CUDA应用程序。
|
3月前
|
监控 算法 数据可视化
深入解析Android应用开发中的高效内存管理策略在移动应用开发领域,Android平台因其开放性和灵活性备受开发者青睐。然而,随之而来的是内存管理的复杂性,这对开发者提出了更高的要求。高效的内存管理不仅能够提升应用的性能,还能有效避免因内存泄漏导致的应用崩溃。本文将探讨Android应用开发中的内存管理问题,并提供一系列实用的优化策略,帮助开发者打造更稳定、更高效的应用。
在Android开发中,内存管理是一个绕不开的话题。良好的内存管理机制不仅可以提高应用的运行效率,还能有效预防内存泄漏和过度消耗,从而延长电池寿命并提升用户体验。本文从Android内存管理的基本原理出发,详细讨论了几种常见的内存管理技巧,包括内存泄漏的检测与修复、内存分配与回收的优化方法,以及如何通过合理的编程习惯减少内存开销。通过对这些内容的阐述,旨在为Android开发者提供一套系统化的内存优化指南,助力开发出更加流畅稳定的应用。
75 0
|
4月前
|
NoSQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之全量同步的内存释放该怎么实现
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
下一篇
无影云桌面