Flink内存管理源码解读之基础数据结构

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 概述 在分布式实时计算领域,如何让框架/引擎足够高效地在内存中存取、处理海量数据是一个非常棘手的问题。在应对这一问题上Flink无疑是做得非常杰出的,Flink的自主内存管理设计也许比它自身的知名度更高一些。

在分布式实时计算领域,如何让框架/引擎足够高效地在内存中存取、处理海量数据是一个非常棘手的问题。在应对这一问题上Flink无疑是做得非常杰出的,Flink的自主内存管理设计也许比它自身的知名度更高一些。正好最近在研读Flink的源码,所以开两篇文章来谈谈Flink的内存管理设计。

Flink的内存管理的亮点体现在作为以Java为主的(部分功能用Scala实现,也是一种遵循JVM规范并依赖JVM解释执行的函数式编程语言)的程序却自主实现内存的管理而不完全依赖于JVM的内存管理机制。它的优势在于灵活、为大数据场景而生、避免(不受控的)频繁GC导致的性能波动,某种程度上跳出了JVM的限制,是一种思路上的开拓。

基本上我们将Flink的内存设计分为两个部分(遵循package的划分方式):

  • 基础数据结构(package:org.apache.flink.core.memory)
  • 内存管理机制(package:org.apache.flink.runtime.memory)

我们将分开来进行讲解,本篇主要关注基本数据结构。内存管理机制请等待后续文章分析。

下图是该package中所有类的关系图:

all-class-diagram

其中:MemorySegmentHeapMemorySegmentHybridMemorySegment是最为关键的三个类,我们将重点分析。

Flink抽象出的内存类型

Flink将其管理的内存抽象为两种类型(主要的抽象依据内存的位置):

  • HEAP:JVM堆内存
  • OFF_HEAP:非堆内存

这在Flink中被定义为一个枚举类型:MemoryType

MemorySegment

Flink所管理的内存被抽象为数据结构:MemorySegment

据此,Flink为它提供了两种实现:

memorysegment-implement

  • HeapMemorySegment : 管理的内存还是JVM堆内存的一部分
  • HybridMemorySegment : Hybrid(on-heap or off-heap)MemorySegment,内存可能为JVM堆内存,也可能不是。

MemorySegment的相关字段:

MemorySegment-all-fields

  • UNSAFE : 用来对堆/非堆内存进行操作,是JVM的非安全的API
  • BYTE_ARRAY_BASE_OFFSET : 二进制字节数组的起始索引,相对于字节数组对象
  • LITTLE_ENDIAN : 布尔值,是否为小端对齐(涉及到字节序的问题)
  • heapMemory : 如果为堆内存,则指向访问的内存的引用,否则若内存为非堆内存,则为null
  • address : 字节数组对应的相对地址(若heapMemory为null,即可能为off-heap内存的绝对地址,后续会详解)
  • addressLimit : 标识地址结束位置(address+size)
  • size : 内存段的字节数

其中,LITTLE_ENDIAN获取的是当前操作系统的字节顺序,它是布尔值,后续的很多put/get操作都需要先判断是bigedian(大端)还是littleedian(小端)。

关于字节序的问题,如果不明白请自行Google

进入代码主题,针对on-heap内存和off-heap内存提供了两个构造器:

MemorySegment-ctor

并且,提供了一大堆get/put方法,这些getXXX/putXXX大都直接或者间接调用了unsafe.getXXX/unsafe.putXXX。这些处理不同内存类型公共的方法在MemorySegment中实现。

MemorySegment-public-method

当然不止这么多,这只是部分。

而特定的内存访问实现在两个各自类中。

MemorySegment-abstract-method

在MemorySegment类中还有三个值得关注的方法:

    public final void copyTo(int offset, MemorySegment target, int targetOffset, int numBytes) {
        final byte[] thisHeapRef = this.heapMemory;
        final byte[] otherHeapRef = target.heapMemory;
        final long thisPointer = this.address + offset;
        final long otherPointer = target.address + targetOffset;

        if ( (numBytes | offset | targetOffset) >= 0 &&
                thisPointer <= this.addressLimit - numBytes && otherPointer <= target.addressLimit - numBytes)
        {
            UNSAFE.copyMemory(thisHeapRef, thisPointer, otherHeapRef, otherPointer, numBytes);
        }
        else if (this.address > this.addressLimit) {
            throw new IllegalStateException("this memory segment has been freed.");
        }
        else if (target.address > target.addressLimit) {
            throw new IllegalStateException("target memory segment has been freed.");
        }
        else {
            throw new IndexOutOfBoundsException(
                    String.format("offset=%d, targetOffset=%d, numBytes=%d, address=%d, targetAddress=%d",
                    offset, targetOffset, numBytes, this.address, target.address));
        }
    }

这是一个批量拷贝方法,用于从当前memory segment的offset偏移量开始拷贝numBytes长度的字节到target memory segment中从targetOffset起始的地方。

    public final int compare(MemorySegment seg2, int offset1, int offset2, int len) {
        while (len >= 8) {
            long l1 = this.getLongBigEndian(offset1);
            long l2 = seg2.getLongBigEndian(offset2);

            if (l1 != l2) {
                return (l1 < l2) ^ (l1 < 0) ^ (l2 < 0) ? -1 : 1;
            }

            offset1 += 8;
            offset2 += 8;
            len -= 8;
        }
        while (len > 0) {
            int b1 = this.get(offset1) & 0xff;
            int b2 = seg2.get(offset2) & 0xff;
            int cmp = b1 - b2;
            if (cmp != 0) {
                return cmp;
            }
            offset1++;
            offset2++;
            len--;
        }
        return 0;
    }

自实现的比较方法,用于对当前memory segment偏移offset1长度为len的数据与seg2偏移起始位offset2长度为len的数据进行比较。

这里有两个while循环:

  • 第一个while是逐字节比较,如果len的长度大于8就从各自的起始偏移量开始获取其数据的长整形表示进行对比,如果相等则各自后移8位(一个字节),并且长度减8,以此循环往复。

  • 第二个循环比较的是最后剩余不到一个字节(八个比特位),因此是按位比较

    public final void swapBytes(byte[] tempBuffer, MemorySegment seg2, int offset1, int offset2, int len) {
        if ( (offset1 | offset2 | len | (tempBuffer.length - len) ) >= 0) {
            final long thisPos = this.address + offset1;
            final long otherPos = seg2.address + offset2;

            if (thisPos <= this.addressLimit - len && otherPos <= seg2.addressLimit - len) {
                // this -> temp buffer
                UNSAFE.copyMemory(this.heapMemory, thisPos, tempBuffer, BYTE_ARRAY_BASE_OFFSET, len);

                // other -> this
                UNSAFE.copyMemory(seg2.heapMemory, otherPos, this.heapMemory, thisPos, len);

                // temp buffer -> other
                UNSAFE.copyMemory(tempBuffer, BYTE_ARRAY_BASE_OFFSET, seg2.heapMemory, otherPos, len);
                return;
            }
            else if (this.address > this.addressLimit) {
                throw new IllegalStateException("this memory segment has been freed.");
            }
            else if (seg2.address > seg2.addressLimit) {
                throw new IllegalStateException("other memory segment has been freed.");
            }
        }

        // index is in fact invalid
        throw new IndexOutOfBoundsException(
                    String.format("offset1=%d, offset2=%d, len=%d, bufferSize=%d, address1=%d, address2=%d",
                            offset1, offset2, len, tempBuffer.length, this.address, seg2.address));
    

这个方法用于对两个memory segment中的一段数据进行交换。除了一些边界值判断,就是一个借助于临时变量的数据交换,只不过用unsafe.copyMemory代替了赋值号而已。

下面我们将探讨Flink提供的对两种类型的内存管理:on-heap 以及 off-heap

HeapMemorySegment

基于JVM堆内存(on-heap)实现的memory segment,这也是Flink最早的内存自管理机制。该类内部定义一个字节数组的引用指向该内存段,之前提到MemorySegment里的那些抽象方法在该类中的实现都基于该内部字节数组的引用进行操作的,以此来获得内建的而非额外的自实现检查(这些检查比如数组越界等)。这是什么意思呢?当你定义

 private byte[] memory

该memory指向MemorySegment中的heapMemory时,实现类似如下这种方法时

    public final byte get(int index) {
        return this.memory[index];
    }

你就可以利用JVM自身的机制来判断index是否在0到length - 1之间。而不用去结合address等属性来判断索引范围了,比如上面这个方法在HybridMemorySegment里是这么实现的

    public byte get(int index) {
        final long pos = address + index;
        if (index >= 0 && pos < addressLimit) {
            return UNSAFE.getByte(heapMemory, pos);
        }
        else if (address > addressLimit) {
            throw new IllegalStateException("segment has been freed");
        }
        else {
            // index is in fact invalid
            throw new IndexOutOfBoundsException();
        }
    }

这个实现必须这么自行check边界值。

因为是JVM的堆内存,所以很多方法的调用可以直接利用JDK自带的方法,比如数组拷贝

    @Override
    public final void get(int index, byte[] dst, int offset, int length) {
        // system arraycopy does the boundary checks anyways, no need to check extra
        System.arraycopy(this.memory, index, dst, offset, length);
    }

    @Override
    public final void put(int index, byte[] src, int offset, int length) {
        // system arraycopy does the boundary checks anyways, no need to check extra
        System.arraycopy(src, offset, this.memory, index, length);
    }

其他方法的实现都很常规,没有太多值得提点的地方。

HybridMemorySegment

这是另一种内存管理实现:它既支持on-heap内存也支持off-heap内存。乍一看,似乎有些匪夷所思,因为已经有一个对on-heap的实现了,为什么还要搞一个Hybrid的,而不是off-heap的? 而且在一个类中对两种不同的内存区域进行操作,也会显得混乱。

那么我们先来看看Flink是如何“优雅”地避免混乱的。这一切还要归功于JVM提供的非安全的操作类(unsafe)提供的一系列方法

 unsafe.XXX(Object o, int offset/position, ...)

这些方法有如下特点:
(1)如果对象o不为null,并且后面的地址或者位置是相对位置,那么会直接对当前对象(比如数组)的相对位置进行操作,既然这里对象不为null,那么这种情况自然满足on-heap的场景;
(2)如果对象o为null,并且后面的地址是某个内存块的绝对地址,那么这些方法的调用也相当于对该内存块进行操作。这里对象o为null,所操作的内存块不是JVM堆内存,这种情况满足了off-heap的场景。

还记得我们在介绍MemorySegment类时,提到的两个属性:

  • heapMemory
  • address

这两个属性组合就可以适配上面的两种场景了。而且,MemorySegment的一个构造参数:offHeapAddress ,已经基本指明了该构造器是专门针对off-heap的了。

MemorySegment给出了一些针对特定数据类型的公共实现,大部分也调用了unsafe的具有如上这种特性的方法,因此其实MemorySegment里已经具有 Hybrid 的意思了。

问题来了,那么Flink是如何获得某个off-heap数据的内存地址呢?答案在如下代码段

    /** The reflection fields with which we access the off-heap pointer from direct ByteBuffers */
    private static final Field ADDRESS_FIELD;

    static {
        try {
            ADDRESS_FIELD = java.nio.Buffer.class.getDeclaredField("address");
            ADDRESS_FIELD.setAccessible(true);
        }
        catch (Throwable t) {
            throw new RuntimeException(
                    "Cannot initialize HybridMemorySegment: off-heap memory is incompatible with this JVM.", t);
        }
    }

通过反射Buffer类获得 address 属性的Field表示,然后

    private static long getAddress(ByteBuffer buffer) {
        if (buffer == null) {
            throw new NullPointerException("buffer is null");
        }
        try {
            return (Long) ADDRESS_FIELD.get(buffer);
        }
        catch (Throwable t) {
            throw new RuntimeException("Could not access direct byte buffer address.", t);
        }
    }

拿到一个buffer的off-heap的地址表示。

虽然通过如上的MemorySegment的两个属性再加上unsafe相关方法的特殊性,HybridMemorySegment的实现已经很清晰,简洁。但它内部还维护了一个指向它管理的off-heap数据的引用:offHeapBuffer。一方面是为了hold住那段内存空间不被释放,另一方面是为了实现自身的一些方法。

MemorySegmentFactory

MemorySegmentFactory是用来创建MemorySegment,而且Flink严重推荐使用它来创建MemorySegment的实例,而不是手动实例化。其目的是:为了让运行时只存在某一种MemorySegment的子类实现的实例,而不是MemorySegment的两个子类的实例都同时存在,因为这会让JIT有加载和选择上的开销,导致大幅降低性能。关于这一点,Flink官方博客专门开了一篇博文来解释他们的对比以及测试方案,请见最后的引用。

MemorySegmentFactory相关的类图

如下图:

MemorySegmentFactory-class-diagram

显而易见,这是设计模式中的工厂方法模式。

MemorySegmentFactory有个内部接口类FactoryMemorySegment的两个实现类的内部类各自实现了该接口,并定义了各自Factory的实现。这块并没有特别的,只是为了防止外部直接实例化HybridMemorySegmentFactoryHeapMemorySegmentFactory,它们各自的构造器都被设置为 private

MemorySegmentFactory类提供了跟Factory接口类似的方法,或者应该说包裹了一层用来指定Factory具体实例的逻辑(基本上每个方法都先调用了ensureInitialized方法):

    private static void ensureInitialized() {
        if (factory == null) {
            factory = HeapMemorySegment.FACTORY;
        }
    }

从上面可以看出,MemorySegmentFactory默认使用的是HeapMemorySegment类的实例来实现MemorySegment

view构建在MemorySegment之上的抽象

除了MemorySegment的相关实现,Flink的Core包还提供了建立在MemorySegment之上的更高的抽象:DataView(数据视图)。

数据视图相关的类关系图:

dataview-class-diagram

有两个接口,分别为输出视图DataOutputView(数据写相关),输入视图DataInputView(数据读相关)。两个接口下分别各有一个子接口提供基于position的seek动作(即指定位置的数据读写操作)。另外分别有两个实现类,它们各自包装了对应的Stream接口。这块也没什么特别的,不做过多说明。

以上是对Flink自主管理内存的数据结构部分的实现解读。

引用

[1]https://flink.apache.org/news/2015/09/16/off-heap-memory.html


原文发布时间为:2016-03-24

本文作者:vinoYang

本文来自云栖社区合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
5天前
|
C语言
【数据结构】栈和队列(c语言实现)(附源码)
本文介绍了栈和队列两种数据结构。栈是一种只能在一端进行插入和删除操作的线性表,遵循“先进后出”原则;队列则在一端插入、另一端删除,遵循“先进先出”原则。文章详细讲解了栈和队列的结构定义、方法声明及实现,并提供了完整的代码示例。栈和队列在实际应用中非常广泛,如二叉树的层序遍历和快速排序的非递归实现等。
63 9
|
4天前
|
存储 搜索推荐 算法
【数据结构】树型结构详解 + 堆的实现(c语言)(附源码)
本文介绍了树和二叉树的基本概念及结构,重点讲解了堆这一重要的数据结构。堆是一种特殊的完全二叉树,常用于实现优先队列和高效的排序算法(如堆排序)。文章详细描述了堆的性质、存储方式及其实现方法,包括插入、删除和取堆顶数据等操作的具体实现。通过这些内容,读者可以全面了解堆的原理和应用。
43 16
|
4天前
|
C语言
【数据结构】二叉树(c语言)(附源码)
本文介绍了如何使用链式结构实现二叉树的基本功能,包括前序、中序、后序和层序遍历,统计节点个数和树的高度,查找节点,判断是否为完全二叉树,以及销毁二叉树。通过手动创建一棵二叉树,详细讲解了每个功能的实现方法和代码示例,帮助读者深入理解递归和数据结构的应用。
31 8
|
7天前
|
存储 C语言
【数据结构】手把手教你单链表(c语言)(附源码)
本文介绍了单链表的基本概念、结构定义及其实现方法。单链表是一种内存地址不连续但逻辑顺序连续的数据结构,每个节点包含数据域和指针域。文章详细讲解了单链表的常见操作,如头插、尾插、头删、尾删、查找、指定位置插入和删除等,并提供了完整的C语言代码示例。通过学习单链表,可以更好地理解数据结构的底层逻辑,提高编程能力。
31 4
|
8天前
|
存储 C语言
【数据结构】顺序表(c语言实现)(附源码)
本文介绍了线性表和顺序表的基本概念及其实现。线性表是一种有限序列,常见的线性表有顺序表、链表、栈、队列等。顺序表是一种基于连续内存地址存储数据的数据结构,其底层逻辑是数组。文章详细讲解了静态顺序表和动态顺序表的区别,并重点介绍了动态顺序表的实现,包括初始化、销毁、打印、增删查改等操作。最后,文章总结了顺序表的时间复杂度和局限性,并预告了后续关于链表的内容。
32 3
|
24天前
|
Java C++ 索引
让星星⭐月亮告诉你,LinkedList和ArrayList底层数据结构及方法源码说明
`LinkedList` 和 `ArrayList` 是 Java 中两种常见的列表实现。`LinkedList` 基于双向链表,适合频繁的插入和删除操作,但按索引访问元素效率较低。`ArrayList` 基于动态数组,支持快速随机访问,但在中间位置插入或删除元素时性能较差。两者均实现了 `List` 接口,`LinkedList` 还额外实现了 `Deque` 接口,提供了更多队列操作。
21 3
|
7天前
|
C语言
【数据结构】双向带头循环链表(c语言)(附源码)
本文介绍了双向带头循环链表的概念和实现。双向带头循环链表具有三个关键点:双向、带头和循环。与单链表相比,它的头插、尾插、头删、尾删等操作的时间复杂度均为O(1),提高了运行效率。文章详细讲解了链表的结构定义、方法声明和实现,包括创建新节点、初始化、打印、判断是否为空、插入和删除节点等操作。最后提供了完整的代码示例。
24 0
|
3月前
|
存储 缓存 编译器
Linux源码阅读笔记06-RCU机制和内存优化屏障
Linux源码阅读笔记06-RCU机制和内存优化屏障
|
3月前
|
测试技术
【初阶数据结构篇】栈的实现(附源码)
在每一个方法的第一排都使用assert宏来判断ps是否为空(避免使用时传入空指针,后续解引用都会报错)。
38 2
|
3月前
|
消息中间件 Kubernetes 监控
实时计算 Flink版操作报错合集之在编译源码时遇到报错:无法访问,该如何处理
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。