Flink运行时之统一的数据交换对象

简介: 统一的数据交换对象 在Flink的执行引擎中,流动的元素主要有两种:缓冲(Buffer)和事件(Event)。Buffer主要针对用户数据交换,而Event则用于一些特殊的控制标识。但在实现时,为了在通信层统一数据交换,Flink提供了数据交换对象——BufferOrEvent。

统一的数据交换对象

在Flink的执行引擎中,流动的元素主要有两种:缓冲(Buffer)和事件(Event)。Buffer主要针对用户数据交换,而Event则用于一些特殊的控制标识。但在实现时,为了在通信层统一数据交换,Flink提供了数据交换对象——BufferOrEvent。它是一个既可以表示Buffer又可以表示Event的类。上层使用者只需调用isBuffer和isEvent方法即可判断当前收到的这条数据是Buffer还是Event。

缓冲

缓冲(Buffer)是数据交换的载体,几乎所有的数据(当然事件是特殊的)交换都需要经过Buffer。Buffer底层依赖于Flink自管理内存的内存段(MemorySegment)作为数据的容器。Buffer在内存段上做了一层封装,这一层封装是为了对基于引用计数的Buffer回收机制提供支持。

引用计数是计算机编程语言中的一种内存管理技术,是指将资源(可以是对象、内存或磁盘)的被引用次数保存起来,当被引用次数变为零时就将其释放的过程。使用引用计数技术可以实现自动资源管理的目的。具体做法可简述为:当创建一个对象的实例并在堆上申请内存时,对象的引用计数就为1,在其他对象中需要持有这个对象时,就需要把该对象的引用计数加1,需要释放一个对象时,就将该对象的引用计数减1,直至对象的引用计数为0,对象的内存会被释放。

引用计数还可以指使用引用计数技术回收未使用资源的垃圾回收算法,Objective-C就是使用这种方式进行内存管理的典型语言之一。

它在内部维护着一个计数器referenceCount,初始值为1。内存回收由缓冲回收器(BufferRecycler)来完成,回收的对象就是内存段(MemorySegment)。

实现引用计数的方法有两个。第一个为retain,用于将引用计数加一:

public Buffer retain() {   
    synchronized (recycleLock) {
        //预防性检测,先确认内存段是否已被回收      
        ensureNotRecycled();      
        referenceCount++;      
        return this;   
    }
}

第二个为回收(或将引用计数减一)的方法recycle,当引用计数减为0时,BufferRecycler会对内存段进行回收:

public void recycle() {   
    synchronized (recycleLock) {      
        if (--referenceCount == 0) {         
            recycler.recycle(memorySegment);      
        }   
    }
}

BufferRecycler接口有一个名为FreeingBufferRecycler的简单实现者,它的做法是直接释放内存段。当然通常为了分配和回收的效率,会对Buffer进行预先分配然后加入到Buffer池中。所以,BufferRecycler的常规实现是基于缓冲池的。除此之外,还有另一个接口BufferProvider(它约定了Buffer提供者如何以同步和异步的模式提供Buffer)共同作为缓冲池(BufferPool)的基接口。

整个的Buffer簇的类图如下:

buffer-package-diagram

缓冲池工厂(BufferPoolFactory)用于创建和销毁缓冲池,网络缓冲池(NetworkBufferPool)是其唯一的实现者。NetworkBufferPool缓存了固定数目的内存段,主要用于网络栈通信。

NetworkBufferPool在构造器的参数中要求指定其缓存的内存段数目,然后它会初始化固定大小的一个队列作为内存段池。与此同时,构造器参数还允许指定内存段大小以及Flink自主管理的内存类型。并根据这些参数初始化队列中的内存段:

if (memoryType == MemoryType.HEAP) {   
    for (int i = 0; i < numberOfSegmentsToAllocate; i++) {      
        byte[] memory = new byte[segmentSize];      
        availableMemorySegments.add(MemorySegmentFactory.wrapPooledHeapMemory(memory, null));   
    }
} else if (memoryType == MemoryType.OFF_HEAP) {   
    for (int i = 0; i < numberOfSegmentsToAllocate; i++) {      
        ByteBuffer memory = ByteBuffer.allocateDirect(segmentSize);      
        availableMemorySegments.add(MemorySegmentFactory.wrapPooledOffHeapMemory(memory, null));   
    }
}

上面的代码段中调用了我们在分析内存管理时分析的内存段工厂(MemorySegmentFactory),注意这里wrapPooledXXX方法其实没什么特殊的,只是新建了相关的内存段实例。不要被其方法名迷惑,所谓的池化的机制都是要在外部维护,比如这里的NetworkBufferPool定义了维护内存段池(也即availableMemorySegments)的一系列方法,比如requestMemorySegment、recycle、destroy等。

因为BufferPool当前只有LocalBufferPool这一个实现,所以NetworkBufferPool在实现BufferPoolFactory的createBufferPool方法时会直接实例化LocalBufferPool。NetworkBufferPool用一个Set维护了其所创建的所有LocalBufferPool的引用。createBufferPool方法要求在创建时指定需要创建的是固定大小的BufferPool还是非固定大小的BufferPool。如果是非固定大小的,NetworkBufferPool也专门提供了一个Set来维护它们,这主要是为了在创建或销毁BufferPool时对这些非固定大小的BufferPool里的Buffer进行“重分布”。

这里对非固定大小的BufferPool里的内存段进行重分布值得我们重点关注一下。其实,所有BufferPool所申请的内存段都归属于NetworkBufferPool所维护的内存段池。只有NetworkBufferPool了解内存段池的所有信息,包括剩余可用的内存段数目。当createBufferPool方法或者destroyBufferPool方法被调用时,对应的可用的内存段数目也会相应得产生变化。这时,为了让内存段被合理地分配并加以利用,所有非固定大小的BufferPool都需要根据最新的可用内存段数来重分布其所包含的内存段数目。具体的重分布的实现如下:

private void redistributeBuffers() throws IOException {
    //获得非固定大小的BufferPool个数   
    int numManagedBufferPools = managedBufferPools.size();   
    //如果没有,则直接返回,避免除零错误
    if (numManagedBufferPools == 0) {      
        return; 
    }   

    //当前总共可用的内存段数目(未实际分配)
    int numAvailableMemorySegment = totalNumberOfMemorySegments - numTotalRequiredBuffers;   
    //每个BufferPool可额外附“赠”的内存段数目   
    int numExcessBuffersPerPool = numAvailableMemorySegment / numManagedBufferPools;   
    //当然可用的内存段不一定正好能完全分摊给所有的非固定大小的BufferPool,所以剩下的余量以轮转的方式分摊
    int numLeftoverBuffers = numAvailableMemorySegment % numManagedBufferPools;   
    int bufferPoolIndex = 0;   
    //遍历BufferPool挨个扩充
    for (LocalBufferPool bufferPool : managedBufferPools) {      
        int leftoverBuffers = bufferPoolIndex++ < numLeftoverBuffers ? 1 : 0;      
        bufferPool.setNumBuffers(
            bufferPool.getNumberOfRequiredMemorySegments() + numExcessBuffersPerPool + leftoverBuffers
        );   
    }
}

该方法的调用环境必须处于同步块中。

LocalBufferPool的setNumBuffers方法并不只是设置一下数目这么简单,具体的逻辑我们暂且按下不表。我们先来看一下LocalBufferPool的实现,它用于管理从NetworkBufferPool申请到的一组Buffer实例。LocalBufferPool中维护着的一些信息:

//当前缓冲区池最少需要的内存段的数目
private final int numberOfRequiredMemorySegments;

//当前可用的内存段,这些内存段已从网络缓冲池中请求到本地,但当前没有被当做缓冲区用于数据传输
private final Queue<MemorySegment> availableMemorySegments = new ArrayDeque<MemorySegment>();

//注册过的获取Buffer可用性的侦听器,当无Buffer可用时,才可注册侦听器
private final Queue<EventListener<Buffer>> registeredListeners = new ArrayDeque<EventListener<Buffer>>();

//缓冲池的当前大小
private int currentPoolSize;

//从网络缓冲池请求的以及以某种形式关联着的所有的内存段数目
private int numberOfRequestedMemorySegments;

LocalBufferPool被实例化时,虽然指定了其所需要的内存段的最小数目,但是NetworkBufferPool并没有将这些内存段实例分配给它,也就是说不是预先静态分配的,而是调用方调用requestBuffer方法(来自BufferProvider接口),在内部触发对NetworkBufferPool的实例方法requestMemorySegment的调用进而获取到内存段。我们来看一下requestBuffer方法的实现:

private Buffer requestBuffer(boolean isBlocking) throws InterruptedException, IOException {   
    synchronized (availableMemorySegments) {
        //在请求之前,可能需要先返还多余的,也就是超出currentPoolSize的内存段给NetworkBufferPool
        returnExcessMemorySegments();      
        boolean askToRecycle = owner != null;

        //当可用内存段队列为空时,说明已没有空闲的内存段,则可能需要从NetworkBufferPool获取      
        while (availableMemorySegments.isEmpty()) {         
            if (isDestroyed) {            
                throw new IllegalStateException("Buffer pool is destroyed.");         
            }         
            //获取的条件是:所请求的总内存段的数目小于当前池大小
            if (numberOfRequestedMemorySegments < currentPoolSize) {            
                //请求一个内存段
                final MemorySegment segment = networkBufferPool.requestMemorySegment();            
                if (segment != null) {               
                    //所请求的内存段总数目加一,并将请求的内存段加入到可用内存段队列中,然后跳出本轮while循环
                    //注意这里是continue而不是break,这里还必须继续判断队列中是否有元素可用,
                    //因为当前对象可能处于分布式的场景下
                    numberOfRequestedMemorySegments++;               
                    availableMemorySegments.add(segment);               
                    continue;            
                }         
            }

            //如果总内存段的数目已大于等于本地缓冲池大小,判断是否需要释放,如果需要,让缓冲区池归属者释放一个内存段         
            if (askToRecycle) {            
                owner.releaseMemory(1);         
            }
            //如果是阻塞式的请求模式,则对当前队列阻塞等待两秒钟,接着仍然继续while循环         
            if (isBlocking) {            
                availableMemorySegments.wait(2000);         
            } else {            
                //否则,直接返回空并退出循环
                return null;         
            }      
        }

        //当有可用内存段时,直接从队列中获取内存段并新建一个Buffer实例      
        return new Buffer(availableMemorySegments.poll(), this);   
    }
}

在上文我们介绍过,在NetworkBufferPool中创建或者销毁BufferPool时,所有非固定大小的BufferPool会被重分布。在分析其实现是,我们看到了它会调用LocalBufferPool的实例方法setNumBuffers,该方法会调整本地缓冲池的大小,并可能会对其所申请的内存段数目产生影响:

public void setNumBuffers(int numBuffers) throws IOException {   
    synchronized (availableMemorySegments) {
        //重分布后新的Buffer数量不得小于最小要求的内存段数量      
        checkArgument(numBuffers >= numberOfRequiredMemorySegments, 
            "Buffer pool needs at least " + numberOfRequiredMemorySegments + 
            " buffers, but tried to set to " + numBuffers + ".");      
        //修改缓冲池容量
        currentPoolSize = numBuffers;      
        //如果当前保有的内存段数目大于新的缓冲池容量,则将超出部分归还
        //注意这里归还并不是精确强制归还的,当本地缓冲池中没有多余的内存段时,归还动作将会终止
        returnExcessMemorySegments();      
        //这是第二重保险,如果Buffer存在归属者且此时本地缓冲区池中保有的内存段仍然大于缓冲池容量
        //则会对多余的内存段进行释放
        if (owner != null && numberOfRequestedMemorySegments > currentPoolSize) {         
            owner.releaseMemory(numberOfRequestedMemorySegments - numBuffers);      
        }   
    }
}

Buffer是数据交换的载体,在所有涉及到数据交换的地方都会用到它。因此理解其相关的实现对于,理解Flink的整个数据流交换体系非常有帮助。

事件

Flink的数据流中不仅仅只有用户的数据,还包含了一些特殊的事件,这些事件都是由算子注入到数据流中的。它们在每个流分区里伴随着其他的数据元素而被有序地分发。接收到这些事件的算子会对这些事件给出响应,典型的事件类型有:

  • 检查点屏障:用于隔离多个检查点之间的数据,保障快照数据的一致性;
  • 迭代屏障:标识流分区已到达了一个超级步的结尾;
  • 子分区数据结束标记:当消费任务获取到该事件时,表示其所消费的对应的分区中的数据已被全部消费完成;

事件假设一个流分区维持着元素顺序。鉴于此,在Flink中一元算子在消费单一流分区时,能够保证FIFO(先进先出)的元素顺序。而为了保证流处理的速率同时避免反压,算子有时会接收超过一个流分区的元素并将它们合并。综合各种场景,Flink中的数据流在任何形式的重分区或广播之后不提供顺序保证。而对无序元素的处理任务交给算子自行实现。

在Flink中所有事件的最终基类都是AbstractEvent。AbstractEvent这一抽象类又派生出另一个抽象类RuntimeEvent,几乎所有预先内置的事件都直接派生于此。除了预定义的事件外,Flink还支持自定义的扩展事件,所有自定义的事件都继承自派生于AbstractEvent的TaskEvent。总结一下,其类继承关系图如下:

Event-class-diagram

上图中继承自RuntimeEvent的三个事件类就是上文列举的典型事件。其中只有CheckpointBarrier包含检查点编号和时间戳这两个属性,其他两个事件类主要起到标识作用。



原文发布时间为:2016-12-20


本文作者:vinoYang


本文来自云栖社区合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
目录
相关文章
|
5月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
简介:本文整理自阿里云高级技术专家李麟在Flink Forward Asia 2025新加坡站的分享,介绍了Flink 2.1 SQL在实时数据处理与AI融合方面的关键进展,包括AI函数集成、Join优化及未来发展方向,助力构建高效实时AI管道。
923 43
|
5月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
本文整理自阿里云的高级技术专家、Apache Flink PMC 成员李麟老师在 Flink Forward Asia 2025 新加坡[1]站 —— 实时 AI 专场中的分享。将带来关于 Flink 2.1 版本中 SQL 在实时数据处理和 AI 方面进展的话题。
382 0
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
|
9月前
|
存储 消息中间件 Kafka
基于 Flink 的中国电信星海时空数据多引擎实时改造
本文整理自中国电信集团大数据架构师李新虎老师在Flink Forward Asia 2024的分享,围绕星海时空智能系统展开,涵盖四个核心部分:时空数据现状、实时场景多引擎化、典型应用及未来展望。系统日处理8000亿条数据,具备亚米级定位能力,通过Flink多引擎架构解决数据膨胀与响应时效等问题,优化资源利用并提升计算效率。应用场景包括运动状态识别、个体行为分析和群智感知,未来将推进湖仓一体改造与三维时空服务体系建设,助力数字化转型与智慧城市建设。
918 3
基于 Flink 的中国电信星海时空数据多引擎实时改造
|
5月前
|
SQL 关系型数据库 Apache
从 Flink 到 Doris 的实时数据写入实践 —— 基于 Flink CDC 构建更实时高效的数据集成链路
本文将深入解析 Flink-Doris-Connector 三大典型场景中的设计与实现,并结合 Flink CDC 详细介绍了整库同步的解决方案,助力构建更加高效、稳定的实时数据处理体系。
2461 0
从 Flink 到 Doris 的实时数据写入实践 —— 基于 Flink CDC 构建更实时高效的数据集成链路
|
6月前
|
存储 消息中间件 搜索推荐
京东零售基于Flink的推荐系统智能数据体系
摘要:本文整理自京东零售技术专家张颖老师,在 Flink Forward Asia 2024 生产实践(二)专场中的分享,介绍了基于Flink构建的推荐系统数据,以及Flink智能体系带来的智能服务功能。内容分为以下六个部分: 推荐系统架构 索引 样本 特征 可解释 指标 Tips:关注「公众号」回复 FFA 2024 查看会后资料~
463 1
京东零售基于Flink的推荐系统智能数据体系
|
10月前
|
Oracle 关系型数据库 Java
【YashanDB知识库】Flink CDC实时同步Oracle数据到崖山
本文介绍通过Flink CDC实现Oracle数据实时同步至崖山数据库(YashanDB)的方法,支持全量与增量同步,并涵盖新增、修改和删除的DML操作。内容包括环境准备(如JDK、Flink版本等)、Oracle日志归档启用、用户权限配置、增量日志记录设置、元数据迁移、Flink安装与配置、生成Flink SQL文件、Streampark部署,以及创建和启动实时同步任务的具体步骤。适合需要跨数据库实时同步方案的技术人员参考。
【YashanDB知识库】Flink CDC实时同步Oracle数据到崖山
|
11月前
|
Java 关系型数据库 MySQL
SpringBoot 通过集成 Flink CDC 来实时追踪 MySql 数据变动
通过详细的步骤和示例代码,您可以在 SpringBoot 项目中成功集成 Flink CDC,并实时追踪 MySQL 数据库的变动。
2837 45
|
存储 监控 数据处理
flink 向doris 数据库写入数据时出现背压如何排查?
本文介绍了如何确定和解决Flink任务向Doris数据库写入数据时遇到的背压问题。首先通过Flink Web UI和性能指标监控识别背压,然后从Doris数据库性能、网络连接稳定性、Flink任务数据处理逻辑及资源配置等方面排查原因,并通过分析相关日志进一步定位问题。
991 61
|
10月前
|
消息中间件 关系型数据库 Kafka
阿里云基于 Flink CDC 的现代数据栈云上实践
阿里云基于 Flink CDC 的现代数据栈云上实践
200 1
|
运维 数据处理 Apache
数据实时计算产品对比测评报告:阿里云实时计算Flink版
数据实时计算产品对比测评报告:阿里云实时计算Flink版